Skip to content

Commit

Permalink
Fix failing unit tests, use QueryLibrary properly in rolling operator…
Browse files Browse the repository at this point in the history
… tests, and enable a pattern for "long-lived" chunk allocations that should not be tracked in the ChunkPoolReleaseTracking, applied to RowSet chunk views.
  • Loading branch information
rcaudy committed Jun 4, 2024
1 parent 36fcef1 commit 98fdfe9
Show file tree
Hide file tree
Showing 23 changed files with 669 additions and 642 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@
import io.deephaven.util.datastructures.ReleaseTracker;
import org.jetbrains.annotations.NotNull;

import java.util.function.Supplier;

/**
* Support for release tracking, in order to detect chunk release errors.
*/
public final class ChunkPoolReleaseTracking {

private static volatile ReleaseTracker<PoolableChunk> releaseTracker;

private static final ThreadLocal<Supplier<ReleaseTracker<PoolableChunk>>> threadLocalReleaseTrackerSupplier =
ThreadLocal.withInitial(() -> () -> releaseTracker);

public static void enableStrict() {
enable(ReleaseTracker.strictReleaseTrackerFactory, true);
}
Expand Down Expand Up @@ -46,24 +51,44 @@ public static void disable() {
releaseTracker = null;
}

public static <CHUNK_TYPE extends PoolableChunk> CHUNK_TYPE untracked(@NotNull final Supplier<CHUNK_TYPE> acquire) {
final Supplier<ReleaseTracker<PoolableChunk>> original = threadLocalReleaseTrackerSupplier.get();
try {
threadLocalReleaseTrackerSupplier.set(() -> null);
return acquire.get();
} finally {
threadLocalReleaseTrackerSupplier.set(original);
}
}

public static void untracked(@NotNull final Runnable release) {
final Supplier<ReleaseTracker<PoolableChunk>> original = threadLocalReleaseTrackerSupplier.get();
try {
threadLocalReleaseTrackerSupplier.set(() -> null);
release.run();
} finally {
threadLocalReleaseTrackerSupplier.set(original);
}
}

static <CHUNK_TYPE extends PoolableChunk> CHUNK_TYPE onTake(@NotNull final CHUNK_TYPE chunk) {
final ReleaseTracker<PoolableChunk> localReleaseTracker = releaseTracker;
final ReleaseTracker<PoolableChunk> localReleaseTracker = threadLocalReleaseTrackerSupplier.get().get();
if (localReleaseTracker != null) {
localReleaseTracker.reportAcquire(chunk);
}
return chunk;
}

static <CHUNK_TYPE extends PoolableChunk> CHUNK_TYPE onGive(@NotNull final CHUNK_TYPE chunk) {
final ReleaseTracker<PoolableChunk> localReleaseTracker = releaseTracker;
final ReleaseTracker<PoolableChunk> localReleaseTracker = threadLocalReleaseTrackerSupplier.get().get();
if (localReleaseTracker != null) {
localReleaseTracker.reportRelease(chunk);
}
return chunk;
}

public static void check() {
final ReleaseTracker<PoolableChunk> localReleaseTracker = releaseTracker;
final ReleaseTracker<PoolableChunk> localReleaseTracker = threadLocalReleaseTrackerSupplier.get().get();
if (localReleaseTracker != null) {
localReleaseTracker.check();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.engine.rowset.impl;

import io.deephaven.chunk.util.pools.ChunkPoolReleaseTracking;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeyRanges;
Expand All @@ -20,7 +21,7 @@ public abstract class RowSequenceAsChunkImpl implements RowSequence {

private void makeKeyIndicesChunk() {
final int isize = intSize();
keyIndicesChunk = WritableLongChunk.makeWritableChunk(isize);
keyIndicesChunk = ChunkPoolReleaseTracking.untracked(() -> WritableLongChunk.makeWritableChunk(isize));
}

protected long runsUpperBound() {
Expand All @@ -42,7 +43,7 @@ private int sizeForRangesChunk() {

private void makeKeyRangesChunk(final int size) {
final WritableLongChunk<OrderedRowKeyRanges> chunk =
WritableLongChunk.makeWritableChunk(size);
ChunkPoolReleaseTracking.untracked(() -> WritableLongChunk.makeWritableChunk(size));
keyRangesChunk = chunk;
}

Expand All @@ -57,7 +58,7 @@ public final LongChunk<OrderedRowKeys> asRowKeyChunk() {
keyIndicesChunk.setSize(keyIndicesChunk.capacity());
fillRowKeyChunk(keyIndicesChunk);
} else {
keyIndicesChunk.close();
ChunkPoolReleaseTracking.untracked(keyIndicesChunk::close);
keyIndicesChunk = null;
}
}
Expand All @@ -82,7 +83,7 @@ public final LongChunk<OrderedRowKeyRanges> asRowKeyRangesChunk() {
if (keyRangesChunk.capacity() >= size) {
fillRowKeyRangesChunk(keyRangesChunk);
} else {
keyRangesChunk.close();
ChunkPoolReleaseTracking.untracked(keyRangesChunk::close);
keyRangesChunk = null;
}
}
Expand Down Expand Up @@ -114,11 +115,11 @@ public void close() {
*/
protected final void closeRowSequenceAsChunkImpl() {
if (keyIndicesChunk != null) {
keyIndicesChunk.close();
ChunkPoolReleaseTracking.untracked(keyIndicesChunk::close);
keyIndicesChunk = null;
}
if (keyRangesChunk != null) {
keyRangesChunk.close();
ChunkPoolReleaseTracking.untracked(keyRangesChunk::close);
keyRangesChunk = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ public void testStaticBy() {
TestCase.assertEquals(int.class, grouped.getColumnSource("j").getType());

table = newTable(intCol("V", 100));
grouped = table.updateView("j=i").groupBy("j");
TestCase.assertEquals(1, grouped.size());
TestCase.assertEquals(2, grouped.numColumns());
TestCase.assertEquals(int.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;

import static io.deephaven.datastructures.util.CollectionUtil.ZERO_LENGTH_STRING_ARRAY;
import static io.deephaven.engine.testutil.testcase.RefreshingTableTestCase.printTableUpdates;
import static io.deephaven.engine.util.TableTools.*;
import static io.deephaven.engine.testutil.TstUtils.*;
Expand Down Expand Up @@ -621,9 +622,9 @@ public void testAjEmptyRight() {
col("LSentinel", "a", "b", "c", "d"));

final QueryTable right = TstUtils.testRefreshingTable(i().toTracking(),
col("Group", CollectionUtil.ZERO_LENGTH_STRING_ARRAY),
col("Group", ZERO_LENGTH_STRING_ARRAY),
intCol("RInt"),
col("RSentinel"));
col("RSentinel", ZERO_LENGTH_STRING_ARRAY));

System.out.println("Left:");
TableTools.show(left);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1302,13 +1302,18 @@ public void testReverse2() {

}

private void checkReverse(QueryTable table, Table reversed, String timestamp) {
private void checkReverse(QueryTable table, Table reversed, String columnName) {
assertEquals(table.size(), reversed.size());
for (long ii = 0; ii < table.size(); ++ii) {
final long jj = table.size() - ii - 1;
assertEquals(
ColumnVectors.ofObject(table, timestamp, Object.class).get(ii),
ColumnVectors.ofObject(reversed, timestamp, Object.class).get(jj));
final ColumnSource<?> tableSource = table.getColumnSource(columnName);
final ColumnSource<?> reversedSource = reversed.getColumnSource(columnName);
try (final RowSet.Iterator tableRows = table.getRowSet().iterator();
final RowSet.Iterator reverseRows = reversed.getRowSet().reverseIterator()) {
while (tableRows.hasNext()) {
assertTrue(reverseRows.hasNext());
final long tableRow = tableRows.nextLong();
final long reverseRow = reverseRows.nextLong();
assertEquals(tableSource.get(tableRow), reversedSource.get(reverseRow));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.deephaven.vector.CharVector;
import io.deephaven.vector.DoubleVector;
import io.deephaven.vector.IntVector;
import io.deephaven.vector.LongVector;
import junit.framework.TestCase;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand Down Expand Up @@ -82,10 +83,10 @@ public void testBy() {
show(doubleCounted);
assertEquals(2, doubleCounted.size());

IntVector counts = ColumnVectors.ofInt(doubleCounted, "Count1");
LongVector counts = ColumnVectors.ofLong(doubleCounted, "Count1");
assertEquals(6L, counts.get(0));
assertEquals(4L, counts.get(1));
counts = ColumnVectors.ofInt(doubleCounted, "Count2");
counts = ColumnVectors.ofLong(doubleCounted, "Count2");
assertEquals(6L, counts.get(0));
assertEquals(4L, counts.get(1));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import io.deephaven.engine.testutil.EvalNugget;
import io.deephaven.engine.testutil.EvalNuggetInterface;
import io.deephaven.engine.util.TotalsTableBuilder;
import io.deephaven.util.QueryConstants;

import java.util.Arrays;
import java.util.LinkedHashSet;
Expand All @@ -27,17 +26,6 @@ public class TestTotalsTable extends RefreshingTableTestCase {

private static final double EPSILON = 0.000000001;

private long shortSum(short[] values) {
long sum = 0;
for (short value : values) {
if (value == QueryConstants.NULL_SHORT) {
continue;
}
sum += value;
}
return sum;
}

public void testTotalsTable() {
final int size = 1000;
final Random random = new Random(0);
Expand Down Expand Up @@ -66,15 +54,15 @@ public void testTotalsTable() {
"floatCol", "byteCol", "shortCol")), resultColumns);

assertEquals(sum(ColumnVectors.ofInt(queryTable, "intCol")),
totals.getColumnSource("intCol").getInt(totals.getRowSet().firstRowKey()));
totals.getColumnSource("intCol").getLong(totals.getRowSet().firstRowKey()));
assertEquals(sum(ColumnVectors.ofDouble(queryTable, "doubleCol")),
totals.getColumnSource("doubleCol").getDouble(totals.getRowSet().firstRowKey()));
assertEquals(sum(ColumnVectors.ofDouble(queryTable, "doubleNullCol")),
totals.getColumnSource("doubleNullCol").getDouble(totals.getRowSet().firstRowKey()));
assertEquals(sum(ColumnVectors.ofFloat(queryTable, "floatCol")),
totals.getColumnSource("floatCol").getFloat(totals.getRowSet().firstRowKey()), 0.02);
assertEquals(sum(ColumnVectors.ofShort(queryTable, "shortCol")),
totals.getColumnSource("shortCol").getShort(totals.getRowSet().firstRowKey()));
totals.getColumnSource("shortCol").getLong(totals.getRowSet().firstRowKey()));

builder.setDefaultOperation("skip");
builder.setOperation("byteCol", "min");
Expand All @@ -89,9 +77,8 @@ public void testTotalsTable() {
totals2.getColumnSource("byteCol").getByte(totals2.getRowSet().firstRowKey()));
assertEquals(queryTable.getColumnSource("Sym").get(queryTable.getRowSet().firstRowKey()),
totals2.getColumnSource("Sym").get(totals2.getRowSet().firstRowKey()));
assertEquals(queryTable.getColumnSource("intCol2").getInt(queryTable.getRowSet().get(queryTable.size() - 1)),
totals2.getColumnSource("intCol2")
.getInt(queryTable.getRowSet().get(totals2.getRowSet().firstRowKey())));
assertEquals(queryTable.getColumnSource("intCol2").getInt(queryTable.getRowSet().lastRowKey()),
totals2.getColumnSource("intCol2").getInt(totals2.getRowSet().get(totals2.getRowSet().firstRowKey())));

builder.setOperation("byteCol", "max");
builder.setOperation("doubleCol", "var");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.Context;
import io.deephaven.engine.table.DataColumn;
import io.deephaven.engine.table.Table;
import static io.deephaven.engine.table.impl.select.ConditionFilter.FilterKernel;
import io.deephaven.engine.table.vectors.ColumnVectors;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.util.type.ArrayTypeUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.Context;
import io.deephaven.engine.table.DataColumn;
import io.deephaven.engine.table.Table;
import static io.deephaven.engine.table.impl.select.ConditionFilter.FilterKernel;
import io.deephaven.engine.table.vectors.ColumnVectors;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.util.type.ArrayTypeUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.Context;
import io.deephaven.engine.table.DataColumn;
import io.deephaven.engine.table.Table;
import static io.deephaven.engine.table.impl.select.ConditionFilter.FilterKernel;
import io.deephaven.engine.table.vectors.ColumnVectors;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.util.type.ArrayTypeUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.Context;
import io.deephaven.engine.table.DataColumn;
import io.deephaven.engine.table.Table;
import static io.deephaven.engine.table.impl.select.ConditionFilter.FilterKernel;
import io.deephaven.engine.table.vectors.ColumnVectors;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.util.type.ArrayTypeUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ private void doTestStaticBucketed(boolean grouped) {
preOp.partitionedTransform(postOp, (source, actual) -> {
Arrays.stream(columns).forEach(col -> {
assertWithCumProd(
ColumnVectors.of(source, col).getDirect(),
ColumnVectors.of(actual, col).getDirect(),
ColumnVectors.of(source, col).toArray(),
ColumnVectors.of(actual, col).toArray(),
actual.getDefinition().getColumn(col).getDataType());
});
return source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ private void doTestStaticBucketed(boolean grouped) {
preOp.partitionedTransform(postOp, (source, actual) -> {
Arrays.stream(columns).forEach(col -> {
assertWithDelta(
ColumnVectors.of(source, col).getDirect(),
ColumnVectors.of(actual, col).getDirect(),
ColumnVectors.of(source, col).toArray(),
ColumnVectors.of(actual, col).toArray(),
DeltaControl.DEFAULT);
});
return source;
Expand All @@ -197,8 +197,8 @@ private void doTestStaticBucketed(boolean grouped) {
preOp.partitionedTransform(postOp, (source, actual) -> {
Arrays.stream(columns).forEach(col -> {
assertWithDelta(
ColumnVectors.of(source, col).getDirect(),
ColumnVectors.of(actual, col).getDirect(),
ColumnVectors.of(source, col).toArray(),
ColumnVectors.of(actual, col).toArray(),
DeltaControl.NULL_DOMINATES);
});
return source;
Expand All @@ -212,8 +212,8 @@ private void doTestStaticBucketed(boolean grouped) {
preOp.partitionedTransform(postOp, (source, actual) -> {
Arrays.stream(columns).forEach(col -> {
assertWithDelta(
ColumnVectors.of(source, col).getDirect(),
ColumnVectors.of(actual, col).getDirect(),
ColumnVectors.of(source, col).toArray(),
ColumnVectors.of(actual, col).toArray(),
DeltaControl.VALUE_DOMINATES);
});
return source;
Expand All @@ -227,8 +227,8 @@ private void doTestStaticBucketed(boolean grouped) {
preOp.partitionedTransform(postOp, (source, actual) -> {
Arrays.stream(columns).forEach(col -> {
assertWithDelta(
ColumnVectors.of(source, col).getDirect(),
ColumnVectors.of(actual, col).getDirect(),
ColumnVectors.of(source, col).toArray(),
ColumnVectors.of(actual, col).toArray(),
DeltaControl.ZERO_DOMINATES);
});
return source;
Expand Down
Loading

0 comments on commit 98fdfe9

Please sign in to comment.