From 5841eec29f0d812d82cfc617c62be4d5b9e3b4f6 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Thu, 23 Jan 2025 20:21:32 -0500 Subject: [PATCH] perf: DH-18300: Improve DataIndex performance. (#6585) DataIndex, particularly when used for where() filters had missing parallelization opportunities; and would read more data than strictly necessary to satisfy the filter. --------- Co-authored-by: Ryan Caudy --- .../java/io/deephaven/base/AtomicUtil.java | 41 +++ .../base/stats/ThreadSafeCounter.java | 30 ++ .../deephaven/base/stats/ThreadSafeValue.java | 37 ++ .../java/io/deephaven/base/stats/Value.java | 18 +- .../io/deephaven/base/stats/TestValue.java | 39 +++ CONTRIBUTING.md | 5 + .../engine/table/BasicDataIndex.java | 35 +- .../io/deephaven/engine/table/DataIndex.java | 16 +- .../engine/table/DataIndexOptions.java | 74 ++++ .../benchmark/engine/GroupByBenchmark.java | 14 - .../benchmark/engine/LastByBenchmark.java | 5 - .../table/impl/AbstractColumnSource.java | 201 ++++++++--- .../engine/table/impl/JoinControl.java | 4 + .../engine/table/impl/QueryTable.java | 40 ++- .../impl/dataindex/RemappedDataIndex.java | 9 +- .../impl/dataindex/StandaloneDataIndex.java | 3 +- .../impl/dataindex/TableBackedDataIndex.java | 7 +- .../impl/dataindex/TransformedDataIndex.java | 8 +- .../select/analyzers/SelectColumnLayer.java | 5 +- .../sources/regioned/MergedDataIndex.java | 328 +++++++++++++++--- .../regioned/PartitioningColumnDataIndex.java | 5 +- .../TestRegionedColumnSourceManager.java | 17 +- 22 files changed, 775 insertions(+), 166 deletions(-) create mode 100644 Base/src/main/java/io/deephaven/base/stats/ThreadSafeCounter.java create mode 100644 Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java create mode 100644 engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java diff --git a/Base/src/main/java/io/deephaven/base/AtomicUtil.java b/Base/src/main/java/io/deephaven/base/AtomicUtil.java index 43aa96ac37b..866d11f5ee6 100644 --- a/Base/src/main/java/io/deephaven/base/AtomicUtil.java +++ b/Base/src/main/java/io/deephaven/base/AtomicUtil.java @@ -5,6 +5,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; public abstract class AtomicUtil { @@ -267,4 +268,44 @@ public static int atomicAndNot(AtomicInteger i, int mask) { } while (!i.compareAndSet(expect, update)); return update; } + + /** + * Sets the field to the minimum of the current value and the passed in value + * + * @param o the object to update + * @param fu the field updater + * @param value the value that is a candidate for the minumum + * @return true if the minimum was set + * @param the type of o + */ + public static boolean setMin(final T o, final AtomicLongFieldUpdater fu, final long value) { + long current = fu.get(o); + while (current > value) { + if (fu.compareAndSet(o, current, value)) { + return true; + } + current = fu.get(o); + } + return false; + } + + /** + * Sets the field to the maximum of the current value and the passed in value + * + * @param o the object to update + * @param fu the field updater + * @param value the value that is a candidate for the maximum + * @return true if the maximum was set + * @param the type of o + */ + public static boolean setMax(final T o, final AtomicLongFieldUpdater fu, final long value) { + long current = fu.get(o); + while (value > current) { + if (fu.compareAndSet(o, current, value)) { + return true; + } + current = fu.get(o); + } + return false; + } } diff --git a/Base/src/main/java/io/deephaven/base/stats/ThreadSafeCounter.java b/Base/src/main/java/io/deephaven/base/stats/ThreadSafeCounter.java new file mode 100644 index 00000000000..d66756187d3 --- /dev/null +++ b/Base/src/main/java/io/deephaven/base/stats/ThreadSafeCounter.java @@ -0,0 +1,30 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.base.stats; + +import java.util.function.LongFunction; + +// -------------------------------------------------------------------- + +/** + * A statistic where each value represents an additive quantity, and thus the sum of the values does have + * meaning. Examples include event counts and processing duration. If the sum of the values does not have a + * useful interpretation, use {@link State} instead. + *
    + *
  • {@link #increment} updates the counter, recording a single value. This is the most common usage. ({@link #sample} + * does exactly the same thing but is a poor verb to use with a Counter.) + *
+ */ +public class ThreadSafeCounter extends ThreadSafeValue { + + public ThreadSafeCounter(final long now) { + super(now); + } + + public char getTypeTag() { + return Counter.TYPE_TAG; + } + + public static final LongFunction FACTORY = ThreadSafeCounter::new; +} diff --git a/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java b/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java new file mode 100644 index 00000000000..b0cee5a2d14 --- /dev/null +++ b/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java @@ -0,0 +1,37 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.base.stats; + +import io.deephaven.base.AtomicUtil; + +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + +/** + * A thread-safe extension of the {@link Value} class. + * + *

+ * The {@link #sample(long)} method is synchronized, so may introduce contention compared to the unsafe Value version of + * sample. + *

+ */ +public abstract class ThreadSafeValue extends Value { + + public ThreadSafeValue(long now) { + super(now); + } + + protected ThreadSafeValue(History history) { + super(history); + } + + @Override + public synchronized void sample(final long x) { + super.sample(x); + } + + @Override + public synchronized String toString() { + return super.toString(); + } +} diff --git a/Base/src/main/java/io/deephaven/base/stats/Value.java b/Base/src/main/java/io/deephaven/base/stats/Value.java index cb8c098b9b4..dd260221f1d 100644 --- a/Base/src/main/java/io/deephaven/base/stats/Value.java +++ b/Base/src/main/java/io/deephaven/base/stats/Value.java @@ -4,7 +4,10 @@ package io.deephaven.base.stats; public abstract class Value { - + /** + * The sample(long) method is not thread safe; and you can get wrong answers out of it. If you require safety, you + * should instead use a ThreadSafeValue. + */ protected long n = 0; protected long last = 0; protected long sum = 0; @@ -51,7 +54,7 @@ protected Value(History history) { this.history = history; } - public void sample(long x) { + public void sample(final long x) { n++; last = x; sum += x; @@ -99,4 +102,15 @@ public void update(Item item, ItemUpdateListener listener, long logInterval, lon } } } + + @Override + public String toString() { + if (n > 0) { + final double std = Math.sqrt(n > 1 ? (sum2 - ((double) sum * sum / (double) n)) / (n - 1) : Double.NaN); + final double avg = (double) sum / n; + return String.format("Value{n=%,d, sum=%,d, max=%,d, min=%,d, avg=%,.3f, std=%,.3f}", n, sum, max, min, avg, + std); + } + return String.format("Value{n=%,d}", n); + } } diff --git a/Base/src/test/java/io/deephaven/base/stats/TestValue.java b/Base/src/test/java/io/deephaven/base/stats/TestValue.java index 2b841c7ae3e..a1ab7eff36d 100644 --- a/Base/src/test/java/io/deephaven/base/stats/TestValue.java +++ b/Base/src/test/java/io/deephaven/base/stats/TestValue.java @@ -5,6 +5,7 @@ import junit.framework.TestCase; +import java.util.concurrent.Semaphore; import java.util.function.LongFunction; // -------------------------------------------------------------------- @@ -49,6 +50,44 @@ public void testCounter() { checkValue(Counter.FACTORY); } + public void testToString() { + // this is purposefully a heisentest, this should make it fail if it is really broken + for (int ii = 0; ii < 10; ++ii) { + doTestToString(); + } + } + + public void doTestToString() { + // we are testing toString, but also creating a pile of threads to exercise some of the AtomicFieldUpdater + // behavior of the value + final Value counter = ThreadSafeCounter.FACTORY.apply(0L); + + final String emptyString = counter.toString(); + assertEquals("Value{n=0}", emptyString); + + final Semaphore semaphore = new Semaphore(0); + final Semaphore completion = new Semaphore(0); + final int n = 100; + for (int ii = 0; ii < n; ++ii) { + final int fii = ii; + new Thread(() -> { + semaphore.acquireUninterruptibly(); + counter.sample(fii); + completion.release(); + }).start(); + } + semaphore.release(100); + completion.acquireUninterruptibly(100); + + assertEquals(n, counter.getN()); + assertEquals(((n - 1) * n) / 2, counter.getSum()); + assertEquals(0, counter.getMin()); + assertEquals(99, counter.getMax()); + + final String asString = counter.toString(); + assertEquals("Value{n=100, sum=4,950, max=99, min=0, avg=49.500, std=29.011}", asString); + } + // ---------------------------------------------------------------- private void checkValue(LongFunction factory) { Value value = factory.apply(1000L); diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 39aea6d0072..e82274c805a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -54,6 +54,11 @@ For more information, see: * [gh pr create](https://cli.github.com/manual/gh_pr_create) * [CLI In Use](https://cli.github.com/manual/examples.html) +## Labels + +- Each pull request must have a `ReleaseNotesNeeded` label if a user can perceive the changes. Build or testing-only changes should have a `NoReleaseNotesNeeded` label. +- Each pull request must have a `DocumentationNeeded` label if the user guide should be updated. Pull requests that do not require a user guide update should have a `NoDocumentationNeeded` label. + ## Styleguide The [styleguide](style/README.md) is applied globally to the entire project, except for generated code that gets checked in. To apply the styleguide, run `./gradlew spotlessApply`. diff --git a/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java b/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java index 13ff41aa000..f13141073b1 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java @@ -96,16 +96,46 @@ default ColumnSource[] keyColumns(@NotNull final ColumnSource[] indexedCol @FinalDefault @NotNull default ColumnSource rowSetColumn() { - return table().getColumnSource(rowSetColumnName(), RowSet.class); + return rowSetColumn(DataIndexOptions.DEFAULT); + } + + /** + * Get the {@link RowSet} {@link ColumnSource} of the index {@link #table() table}. + * + * @param options parameters for controlling how the the table will be built (if necessary) in order to retrieve the + * result {@link RowSet} {@link ColumnSource} + * + * @return The {@link RowSet} {@link ColumnSource} + */ + @FinalDefault + @NotNull + default ColumnSource rowSetColumn(final DataIndexOptions options) { + return table(options).getColumnSource(rowSetColumnName(), RowSet.class); } /** * Get the {@link Table} backing this data index. + * + *

+ * The returned table is fully in-memory, equivalent to {@link #table(DataIndexOptions)} with default options. + *

* * @return The {@link Table} */ @NotNull - Table table(); + default Table table() { + return table(DataIndexOptions.DEFAULT); + } + + /** + * Get the {@link Table} backing this data index. + * + * @param options parameters to control the returned table + * + * @return The {@link Table} + */ + @NotNull + Table table(DataIndexOptions options); /** * Whether the index {@link #table()} {@link Table#isRefreshing() is refreshing}. Some transformations will force @@ -115,4 +145,5 @@ default ColumnSource rowSetColumn() { * otherwise */ boolean isRefreshing(); + } diff --git a/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java b/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java index b257bdbc8d0..f248afb014c 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java @@ -48,7 +48,21 @@ interface RowKeyLookup { * @return A function that provides map-like lookup of index {@link #table()} row keys from an index lookup key */ @NotNull - RowKeyLookup rowKeyLookup(); + default RowKeyLookup rowKeyLookup() { + return rowKeyLookup(DataIndexOptions.DEFAULT); + } + + /** + * Build a {@link RowKeyLookup lookup function} of row keys for this index. If {@link #isRefreshing()} is + * {@code true}, this lookup function is only guaranteed to be accurate for the current cycle. Lookup keys should be + * in the order of the index's key columns. + * + * @param options parameters for building the table, if required by this RowKeyLookup + * + * @return A function that provides map-like lookup of index {@link #table()} row keys from an index lookup key + */ + @NotNull + RowKeyLookup rowKeyLookup(DataIndexOptions options); /** * Return a {@link RowKeyLookup lookup function} function of index row keys for this index. If diff --git a/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java b/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java new file mode 100644 index 00000000000..2498d36966d --- /dev/null +++ b/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java @@ -0,0 +1,74 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.engine.table; + +import io.deephaven.annotations.BuildableStyle; +import io.deephaven.api.filter.Filter; +import org.immutables.value.Value; + +/** + * Options for controlling the function of a {@link DataIndex}. + * + */ +@Value.Immutable +@BuildableStyle +public interface DataIndexOptions { + /** + * Static default options, which expect that operations will use the full table. + */ + DataIndexOptions DEFAULT = DataIndexOptions.builder().build(); + + /** + * Static options for operations that use a partial table instead of the full table. + */ + DataIndexOptions USING_PARTIAL_TABLE = DataIndexOptions.builder().operationUsesPartialTable(true).build(); + + /** + * Does this operation use only a subset of the DataIndex? + * + *

+ * The DataIndex implementation may use this hint to defer work for some row sets. + *

+ * + *

+ * Presently, this is used for the {@link Table#where(Filter)} operation to hint that work for computing + * {@link io.deephaven.engine.rowset.RowSet RowSets} for non-matching keys should be deferred. + *

+ * + * @return if this operation is only going to use a subset of this data index + */ + @Value.Default + default boolean operationUsesPartialTable() { + return false; + } + + /** + * Create a new builder for a {@link DataIndexOptions}. + * + * @return + */ + static Builder builder() { + return ImmutableDataIndexOptions.builder(); + } + + /** + * The builder interface to construct a {@link DataIndexOptions}. + */ + interface Builder { + /** + * Set whether this operation only uses a subset of the data index. + * + * @param usesPartialTable true if this operation only uses a partial table + * @return this builder + */ + Builder operationUsesPartialTable(boolean usesPartialTable); + + /** + * Build the {@link DataIndexOptions}. + * + * @return an immutable DataIndexOptions structure. + */ + DataIndexOptions build(); + } +} diff --git a/engine/benchmark/src/benchmark/java/io/deephaven/benchmark/engine/GroupByBenchmark.java b/engine/benchmark/src/benchmark/java/io/deephaven/benchmark/engine/GroupByBenchmark.java index 50af9c1fff5..c544aeb41ce 100644 --- a/engine/benchmark/src/benchmark/java/io/deephaven/benchmark/engine/GroupByBenchmark.java +++ b/engine/benchmark/src/benchmark/java/io/deephaven/benchmark/engine/GroupByBenchmark.java @@ -38,9 +38,6 @@ public class GroupByBenchmark { @Param({"Historical"}) private String tableType; - @Param({"operator"}) - private String mode; - @Param({"String", "Int", "Composite"}) private String keyType; @@ -122,17 +119,6 @@ public void setupEnv(BenchmarkParams params) { state = new TableBenchmarkState(BenchmarkTools.stripName(params.getBenchmark()), params.getWarmup().getCount()); table = bmt.getTable().coalesce().dropColumns("PartCol"); - - switch (mode) { - case "chunked": - QueryTable.USE_OLDER_CHUNKED_BY = true; - break; - case "operator": - QueryTable.USE_OLDER_CHUNKED_BY = false; - break; - default: - throw new IllegalArgumentException("Unknown mode " + mode); - } } @TearDown(Level.Trial) diff --git a/engine/benchmark/src/benchmark/java/io/deephaven/benchmark/engine/LastByBenchmark.java b/engine/benchmark/src/benchmark/java/io/deephaven/benchmark/engine/LastByBenchmark.java index c3dcc331c8e..c0ee0ada285 100644 --- a/engine/benchmark/src/benchmark/java/io/deephaven/benchmark/engine/LastByBenchmark.java +++ b/engine/benchmark/src/benchmark/java/io/deephaven/benchmark/engine/LastByBenchmark.java @@ -59,9 +59,6 @@ public class LastByBenchmark { @Param({"1"}) private int valueCount; - @Param({"true"}) - private boolean tracked; - private Table table; private String keyName; @@ -171,8 +168,6 @@ public void setupEnv(BenchmarkParams params) { state = new TableBenchmarkState(BenchmarkTools.stripName(params.getBenchmark()), params.getWarmup().getCount()); table = bmt.getTable().coalesce().dropColumns("PartCol"); - - QueryTable.TRACKED_FIRST_BY = QueryTable.TRACKED_LAST_BY = tracked; } @TearDown(Level.Trial) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java index 65d434d6598..2de486cbd02 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java @@ -3,6 +3,9 @@ // package io.deephaven.engine.table.impl; +import io.deephaven.base.stats.Stats; +import io.deephaven.base.stats.ThreadSafeCounter; +import io.deephaven.base.stats.Value; import io.deephaven.base.string.cache.CharSequenceUtils; import io.deephaven.base.verify.Assert; import io.deephaven.chunk.Chunk; @@ -10,12 +13,14 @@ import io.deephaven.chunk.ObjectChunk; import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.attributes.Values; +import io.deephaven.configuration.Configuration; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.primitive.iterator.CloseableIterator; import io.deephaven.engine.rowset.*; import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.DataIndex; +import io.deephaven.engine.table.DataIndexOptions; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.impl.chunkfillers.ChunkFiller; import io.deephaven.engine.table.impl.chunkfilter.ChunkFilter; @@ -34,12 +39,48 @@ import java.time.Instant; import java.time.ZonedDateTime; +import java.util.Arrays; import java.util.Collections; public abstract class AbstractColumnSource implements ColumnSource, DefaultChunkSource.WithPrev { + /** + * For a {@link #match(boolean, boolean, boolean, DataIndex, RowSet, Object...)} call that uses a DataIndex, by + * default we do not force the entire DataIndex to be loaded into memory. This is because many + * {@link io.deephaven.engine.table.impl.select.MatchFilter}s are highly selective and only need to instantiate a + * single RowSet value rather than the complete DataIndex for the entire table. When the Configuration property + * "AbstractColumnSource.usePartialDataIndex" is set to false, the query engine materializes the entire DataIndex + * table for the match call. + */ + public static boolean USE_PARTIAL_TABLE_DATA_INDEX = Configuration.getInstance() + .getBooleanWithDefault("AbstractColumnSource.usePartialDataIndex", true); + /** + * After generating a DataIndex table and identifying which row keys are responsive to the filter, the result RowSet + * can be built in serial or in parallel. By default, the index is built in parallel which may take advantage of + * using more threads for I/O of the index data structure. Parallel builds do require more setup and thread + * synchronization, so they can be disabled by setting the Configuration property + * "AbstractColumnSource.useParallelIndexBuild" to false. + */ + public static boolean USE_PARALLEL_ROWSET_BUILD = Configuration.getInstance() + .getBooleanWithDefault("AbstractColumnSource.useParallelRowSetBuild", true); + + /** + * Duration of match() calls using a DataIndex (also provides the count). + */ + private static final Value INDEX_FILTER_MILLIS = + Stats.makeItem("AbstractColumnSource", "indexFilterMillis", ThreadSafeCounter.FACTORY, + "Duration of match() with a DataIndex in millis") + .getValue(); + /** + * Duration of match() calls using a chunk filter (i.e. no DataIndex). + */ + private static final Value CHUNK_FILTER_MILLIS = + Stats.makeItem("AbstractColumnSource", "chunkFilterMillis", ThreadSafeCounter.FACTORY, + "Duration of match() without a DataIndex in millis") + .getValue(); + /** * Minimum average run length in an {@link RowSequence} that should trigger {@link Chunk}-filling by key ranges * instead of individual keys. @@ -115,82 +156,134 @@ public WritableRowSet match( final boolean usePrev, final boolean caseInsensitive, @Nullable final DataIndex dataIndex, - @NotNull final RowSet mapper, + @NotNull final RowSet rowsetToFilter, final Object... keys) { + if (dataIndex == null) { + return doChunkFilter(invertMatch, usePrev, caseInsensitive, rowsetToFilter, keys); + } + final long t0 = System.nanoTime(); + try { + return doDataIndexFilter(invertMatch, usePrev, caseInsensitive, dataIndex, rowsetToFilter, keys); + } finally { + final long t1 = System.nanoTime(); + INDEX_FILTER_MILLIS.sample((t1 - t0) / 1_000_000); + } + } - if (dataIndex != null) { - final Table indexTable = dataIndex.table(); - final RowSet matchingIndexRows; - if (caseInsensitive && type == String.class) { - // Linear scan through the index table, accumulating index row keys for case-insensitive matches - final RowSetBuilderSequential matchingIndexRowsBuilder = RowSetFactory.builderSequential(); - - // noinspection rawtypes - final KeyedObjectHashSet keySet = new KeyedObjectHashSet<>(new CIStringKey()); - // noinspection unchecked - Collections.addAll(keySet, keys); - - final RowSet indexRowSet = usePrev ? indexTable.getRowSet().prev() : indexTable.getRowSet(); - final ColumnSource indexKeySource = - indexTable.getColumnSource(dataIndex.keyColumnNames().get(0), String.class); - - final int chunkSize = (int) Math.min(CHUNK_SIZE, indexRowSet.size()); - try (final RowSequence.Iterator indexRowSetIterator = indexRowSet.getRowSequenceIterator(); - final GetContext indexKeyGetContext = indexKeySource.makeGetContext(chunkSize)) { - while (indexRowSetIterator.hasMore()) { - final RowSequence chunkIndexRows = indexRowSetIterator.getNextRowSequenceWithLength(chunkSize); - final ObjectChunk chunkKeys = (usePrev - ? indexKeySource.getPrevChunk(indexKeyGetContext, chunkIndexRows) - : indexKeySource.getChunk(indexKeyGetContext, chunkIndexRows)).asObjectChunk(); - final LongChunk chunkRowKeys = chunkIndexRows.asRowKeyChunk(); - final int thisChunkSize = chunkKeys.size(); - for (int ii = 0; ii < thisChunkSize; ++ii) { - final String key = chunkKeys.get(ii); - if (keySet.containsKey(key)) { - matchingIndexRowsBuilder.appendKey(chunkRowKeys.get(ii)); - } + private WritableRowSet doDataIndexFilter(final boolean invertMatch, + final boolean usePrev, + final boolean caseInsensitive, + @NotNull final DataIndex dataIndex, + @NotNull final RowSet rowsetToFilter, + final Object[] keys) { + final DataIndexOptions partialOption = + USE_PARTIAL_TABLE_DATA_INDEX ? DataIndexOptions.USING_PARTIAL_TABLE : DataIndexOptions.DEFAULT; + + final Table indexTable = dataIndex.table(partialOption); + + final RowSet matchingIndexRows; + if (caseInsensitive && type == String.class) { + // Linear scan through the index table, accumulating index row keys for case-insensitive matches + final RowSetBuilderSequential matchingIndexRowsBuilder = RowSetFactory.builderSequential(); + + // noinspection rawtypes + final KeyedObjectHashSet keySet = new KeyedObjectHashSet<>(new CIStringKey()); + // noinspection unchecked + Collections.addAll(keySet, keys); + + final RowSet indexRowSet = usePrev ? indexTable.getRowSet().prev() : indexTable.getRowSet(); + final ColumnSource indexKeySource = + indexTable.getColumnSource(dataIndex.keyColumnNames().get(0), String.class); + + final int chunkSize = (int) Math.min(CHUNK_SIZE, indexRowSet.size()); + try (final RowSequence.Iterator indexRowSetIterator = indexRowSet.getRowSequenceIterator(); + final GetContext indexKeyGetContext = indexKeySource.makeGetContext(chunkSize)) { + while (indexRowSetIterator.hasMore()) { + final RowSequence chunkIndexRows = indexRowSetIterator.getNextRowSequenceWithLength(chunkSize); + final ObjectChunk chunkKeys = (usePrev + ? indexKeySource.getPrevChunk(indexKeyGetContext, chunkIndexRows) + : indexKeySource.getChunk(indexKeyGetContext, chunkIndexRows)).asObjectChunk(); + final LongChunk chunkRowKeys = chunkIndexRows.asRowKeyChunk(); + final int thisChunkSize = chunkKeys.size(); + for (int ii = 0; ii < thisChunkSize; ++ii) { + final String key = chunkKeys.get(ii); + if (keySet.containsKey(key)) { + matchingIndexRowsBuilder.appendKey(chunkRowKeys.get(ii)); } } } - matchingIndexRows = matchingIndexRowsBuilder.build(); - } else { - // Use the lookup function to get the index row keys for the matching keys - final RowSetBuilderRandom matchingIndexRowsBuilder = RowSetFactory.builderRandom(); - - final DataIndex.RowKeyLookup rowKeyLookup = dataIndex.rowKeyLookup(); - for (Object key : keys) { - final long rowKey = rowKeyLookup.apply(key, usePrev); - if (rowKey != RowSequence.NULL_ROW_KEY) { - matchingIndexRowsBuilder.addKey(rowKey); - } + } + matchingIndexRows = matchingIndexRowsBuilder.build(); + } else { + // Use the lookup function to get the index row keys for the matching keys + final RowSetBuilderRandom matchingIndexRowsBuilder = RowSetFactory.builderRandom(); + + final DataIndex.RowKeyLookup rowKeyLookup = dataIndex.rowKeyLookup(partialOption); + for (final Object key : keys) { + final long rowKey = rowKeyLookup.apply(key, usePrev); + if (rowKey != RowSequence.NULL_ROW_KEY) { + matchingIndexRowsBuilder.addKey(rowKey); } - matchingIndexRows = matchingIndexRowsBuilder.build(); } + matchingIndexRows = matchingIndexRowsBuilder.build(); + } - try (final SafeCloseable ignored = matchingIndexRows) { - final WritableRowSet filtered = invertMatch ? mapper.copy() : RowSetFactory.empty(); - if (matchingIndexRows.isNonempty()) { - final ColumnSource indexRowSetSource = usePrev - ? dataIndex.rowSetColumn().getPrevSource() - : dataIndex.rowSetColumn(); + try (final SafeCloseable ignored = matchingIndexRows) { + final WritableRowSet filtered = invertMatch ? rowsetToFilter.copy() : RowSetFactory.empty(); + if (matchingIndexRows.isNonempty()) { + final ColumnSource indexRowSetSource = usePrev + ? dataIndex.rowSetColumn(partialOption).getPrevSource() + : dataIndex.rowSetColumn(partialOption); + + if (USE_PARALLEL_ROWSET_BUILD) { + final long[] rowKeyArray = new long[matchingIndexRows.intSize()]; + matchingIndexRows.toRowKeyArray(rowKeyArray); + Arrays.stream(rowKeyArray).parallel().forEach((final long rowKey) -> { + final RowSet matchingRowSet = indexRowSetSource.get(rowKey); + assert matchingRowSet != null; + if (invertMatch) { + synchronized (filtered) { + filtered.remove(matchingRowSet); + } + } else { + try (final RowSet intersected = matchingRowSet.intersect(rowsetToFilter)) { + synchronized (filtered) { + filtered.insert(intersected); + } + } + } + }); + } else { try (final CloseableIterator matchingIndexRowSetIterator = ChunkedColumnIterator.make(indexRowSetSource, matchingIndexRows)) { matchingIndexRowSetIterator.forEachRemaining((final RowSet matchingRowSet) -> { if (invertMatch) { filtered.remove(matchingRowSet); } else { - try (final RowSet intersected = matchingRowSet.intersect(mapper)) { + try (final RowSet intersected = matchingRowSet.intersect(rowsetToFilter)) { filtered.insert(intersected); } } }); } } - return filtered; } - } else { - return ChunkFilter.applyChunkFilter(mapper, this, usePrev, + return filtered; + } + } + + private WritableRowSet doChunkFilter(final boolean invertMatch, + final boolean usePrev, + final boolean caseInsensitive, + @NotNull final RowSet rowsetToFilter, + final Object[] keys) { + final long t0 = System.nanoTime(); + try { + return ChunkFilter.applyChunkFilter(rowsetToFilter, this, usePrev, ChunkMatchFilterFactory.getChunkFilter(type, caseInsensitive, invertMatch, keys)); + } finally { + final long t1 = System.nanoTime(); + CHUNK_FILTER_MILLIS.sample((t1 - t0) / 1_000_000); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/JoinControl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/JoinControl.java index 57af02c692f..cebd69e1365 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/JoinControl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/JoinControl.java @@ -59,6 +59,10 @@ int tableSize(final long expectedEntries) { @Nullable DataIndex dataIndexToUse(Table table, ColumnSource[] sources) { + // Configuration property that serves as an escape hatch + if (!QueryTable.USE_DATA_INDEX_FOR_JOINS) { + return null; + } final DataIndexer indexer = DataIndexer.existingOf(table.getRowSet()); return indexer == null ? null : LivenessScopeStack.computeEnclosed( diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index a431891f94d..ff642fe80a6 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -198,12 +198,27 @@ public interface MemoizableOperation ChunkedOperatorAggregationHelper.aggregation( + () -> ChunkedOperatorAggregationHelper.aggregation(aggregationControl, aggregationContextFactory, this, preserveEmpty, initialGroups, groupByColumns)); } } @@ -1237,13 +1244,14 @@ private void initializeAndPrioritizeFilters(@NotNull final WhereFilter... filter final int numFilters = filters.length; final BitSet priorityFilterIndexes = new BitSet(numFilters); - final QueryCompilerRequestProcessor.BatchProcessor compilationProcesor = QueryCompilerRequestProcessor.batch(); + final QueryCompilerRequestProcessor.BatchProcessor compilationProcessor = QueryCompilerRequestProcessor.batch(); + // Initialize our filters immediately so we can examine the columns they use. Note that filter // initialization is safe to invoke repeatedly. for (final WhereFilter filter : filters) { - filter.init(getDefinition(), compilationProcesor); + filter.init(getDefinition(), compilationProcessor); } - compilationProcesor.compile(); + compilationProcessor.compile(); for (int fi = 0; fi < numFilters; ++fi) { final WhereFilter filter = filters[fi]; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/RemappedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/RemappedDataIndex.java index 95311871f96..b7f960eb5f8 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/RemappedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/RemappedDataIndex.java @@ -6,6 +6,7 @@ import io.deephaven.base.verify.Assert; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.DataIndex; +import io.deephaven.engine.table.DataIndexOptions; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.impl.indexer.DataIndexer; import org.jetbrains.annotations.NotNull; @@ -91,14 +92,14 @@ public List keyColumnNames() { @Override @NotNull - public Table table() { - return sourceIndex.table(); + public Table table(final DataIndexOptions options) { + return sourceIndex.table(options); } @Override @NotNull - public RowKeyLookup rowKeyLookup() { - return sourceIndex.rowKeyLookup(); + public RowKeyLookup rowKeyLookup(final DataIndexOptions options) { + return sourceIndex.rowKeyLookup(options); } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/StandaloneDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/StandaloneDataIndex.java index b496c96171b..cf4eafbf9c7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/StandaloneDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/StandaloneDataIndex.java @@ -6,6 +6,7 @@ import io.deephaven.engine.liveness.LivenessArtifact; import io.deephaven.engine.table.BasicDataIndex; import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.DataIndexOptions; import io.deephaven.engine.table.Table; import org.jetbrains.annotations.NotNull; @@ -63,7 +64,7 @@ public String rowSetColumnName() { @Override @NotNull - public Table table() { + public Table table(final DataIndexOptions unused) { return table; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TableBackedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TableBackedDataIndex.java index 4802143a9a1..d2a9d8659a1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TableBackedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TableBackedDataIndex.java @@ -9,6 +9,7 @@ import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.DataIndexOptions; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.table.impl.by.*; @@ -83,7 +84,7 @@ public Map, String> keyColumnNamesByIndexedColumn() { @Override @NotNull - public Table table() { + public Table table(final DataIndexOptions unused) { Table localIndexTable; if ((localIndexTable = indexTable) != null) { return localIndexTable; @@ -136,8 +137,8 @@ private Table computeTable() { @Override @NotNull - public RowKeyLookup rowKeyLookup() { - table(); + public RowKeyLookup rowKeyLookup(final DataIndexOptions options) { + table(options); return (final Object key, final boolean usePrev) -> { // Pass the object to the aggregation lookup, then return the resulting row key. This index will be // correct in prev or current space because of the aggregation's hash-based lookup. diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TransformedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TransformedDataIndex.java index 7759adcaa49..c9f923566b4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TransformedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TransformedDataIndex.java @@ -10,11 +10,7 @@ import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.WritableRowSet; -import io.deephaven.engine.table.BasicDataIndex; -import io.deephaven.engine.table.ColumnSource; -import io.deephaven.engine.table.DataIndex; -import io.deephaven.engine.table.DataIndexTransformer; -import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.ForkJoinPoolOperationInitializer; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; import io.deephaven.engine.table.impl.select.FunctionalColumn; @@ -72,7 +68,7 @@ public String rowSetColumnName() { @Override @NotNull - public Table table() { + public Table table(final DataIndexOptions unused) { Table localIndexTable; if ((localIndexTable = indexTable) != null) { return localIndexTable; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java index 431d00ee7e7..356bd05f8b7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java @@ -191,7 +191,10 @@ public Runnable createUpdateHandler( // If we have shifts, that makes everything nasty; so we do not want to deal with it final boolean hasShifts = upstream.shifted().nonempty(); - final boolean serialTableOperationsSafe = updateGraph.serialTableOperationsSafe() + // liveResultOwner is only null when we are static; in the static case there is no need to + // worry about serial table operation checking + final boolean serialTableOperationsSafe = liveResultOwner == null + || updateGraph.serialTableOperationsSafe() || updateGraph.sharedLock().isHeldByCurrentThread() || updateGraph.exclusiveLock().isHeldByCurrentThread(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java index 881e8a21e88..76d305b8bcc 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java @@ -4,17 +4,20 @@ package io.deephaven.engine.table.impl.sources.regioned; import io.deephaven.UncheckedDeephavenException; +import io.deephaven.api.ColumnName; +import io.deephaven.api.Selectable; +import io.deephaven.base.stats.Stats; +import io.deephaven.base.stats.ThreadSafeCounter; +import io.deephaven.base.stats.Value; import io.deephaven.base.verify.Assert; import io.deephaven.base.verify.Require; +import io.deephaven.configuration.Configuration; import io.deephaven.engine.primitive.iterator.CloseableIterator; import io.deephaven.engine.rowset.RowSequence; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetBuilderSequential; import io.deephaven.engine.rowset.RowSetFactory; -import io.deephaven.engine.table.BasicDataIndex; -import io.deephaven.engine.table.ColumnSource; -import io.deephaven.engine.table.PartitionedTableFactory; -import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.ForkJoinPoolOperationInitializer; import io.deephaven.engine.table.impl.by.AggregationProcessor; import io.deephaven.engine.table.impl.by.AggregationRowLookup; @@ -23,14 +26,16 @@ import io.deephaven.engine.table.impl.locations.TableLocation; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; import io.deephaven.engine.table.impl.select.FunctionalColumn; +import io.deephaven.engine.table.impl.select.MultiSourceFunctionalColumn; import io.deephaven.engine.table.impl.select.SelectColumn; -import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.InternalUseOnly; import io.deephaven.vector.ObjectVector; import org.jetbrains.annotations.NotNull; import java.util.*; +import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.stream.IntStream; +import java.util.stream.LongStream; /** * DataIndex that accumulates the individual per-{@link TableLocation} data indexes of a {@link Table} backed by a @@ -45,6 +50,23 @@ */ @InternalUseOnly class MergedDataIndex extends AbstractDataIndex implements DataIndexer.RetainableDataIndex { + /** + * The duration in nanos to build a DataIndex table. + */ + private static final Value BUILD_INDEX_TABLE_MILLIS = Stats + .makeItem("MergedDataIndex", "buildTableMillis", ThreadSafeCounter.FACTORY, + "Duration in millis of building an index") + .getValue(); + + /** + * When merging row sets from multiple component DataIndex structures, reading each individual component can be + * performed in parallel to improve the wall-clock I/O time observed. For cases where the number of individual + * groups to merge is small and I/O bound this can be very beneficial. Setting up the parallelism, however, can add + * additional overhead. Setting the Configuration property "MergedDataIndex.useParallelLazyFetch" to false disables + * this behavior and each key's merged RowSet is computed serially. + */ + public static boolean USE_PARALLEL_LAZY_FETCH = Configuration.getInstance() + .getBooleanWithDefault("MergedDataIndex.useParallelLazyFetch", true); private static final String LOCATION_DATA_INDEX_TABLE_COLUMN_NAME = "__DataIndexTable"; @@ -63,6 +85,17 @@ class MergedDataIndex extends AbstractDataIndex implements DataIndexer.Retainabl */ private volatile Table indexTable; + /** + * A lazy version of the table. This value is never set if indexTable is set. + */ + private volatile Table lazyTable; + + /** + * If we have a lazy version of the table, we hold onto the partitioned table, which permits us to select the + * underlying rowsets without needing to reselect the key columns. + */ + private volatile PartitionedTable lazyPartitionedTable; + /** * Whether this index is known to be corrupt. */ @@ -113,19 +146,29 @@ public Map, String> keyColumnNamesByIndexedColumn() { @Override @NotNull - public Table table() { + public Table table(final DataIndexOptions options) { Table localIndexTable; if ((localIndexTable = indexTable) != null) { return localIndexTable; } + final boolean lazyRowSetMerge = options.operationUsesPartialTable(); + if (lazyRowSetMerge) { + if ((localIndexTable = lazyTable) != null) { + return localIndexTable; + } + } synchronized (this) { if ((localIndexTable = indexTable) != null) { return localIndexTable; + } else if (lazyRowSetMerge) { + if ((localIndexTable = lazyTable) != null) { + return localIndexTable; + } } try { return QueryPerformanceRecorder.withNugget( String.format("Merge Data Indexes [%s]", String.join(", ", keyColumnNames)), - ForkJoinPoolOperationInitializer.ensureParallelizable(this::buildTable)); + ForkJoinPoolOperationInitializer.ensureParallelizable(() -> buildTable(lazyRowSetMerge))); } catch (Throwable t) { isCorrupt = true; throw t; @@ -133,78 +176,262 @@ public Table table() { } } - private Table buildTable() { - final Table locationTable = columnSourceManager.locationTable().coalesce(); + /** + * The RowSetCacher is a bit of a hack that allows us to avoid reading actual RowSets from disk until they are + * actually required for a query operation. We are breaking engine rules in that we reference the source + * ColumnSource directly and do not have correct dependencies encoded in a select. MergedDataIndexes are only + * permitted for a static table, so we can get away with this. + * + *

+ * Once a RowSet has been written, we write down the result into an AtomicReferenceArray so that it need not be read + * repeatedly. If two threads attempt to concurrently fetch the same RowSet, then we use a placeholder object in the + * array to avoid duplicated work. + *

+ */ + private static class RowSetCacher { + final ColumnSource> source; + final AtomicReferenceArray results; - // Perform a parallelizable update to produce coalesced location index tables with their row sets shifted by - // the appropriate region offset. - // This potentially loads many small row sets into memory, but it avoids the risk of re-materializing row set - // pages during the accumulation phase. - final String[] keyColumnNamesArray = keyColumnNames.toArray(String[]::new); - final Table locationDataIndexes = locationTable - .update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>( - columnSourceManager.locationColumnName(), TableLocation.class, - LOCATION_DATA_INDEX_TABLE_COLUMN_NAME, Table.class, - (final long locationRowKey, final TableLocation location) -> loadIndexTableAndShiftRowSets( - locationRowKey, location, keyColumnNamesArray))))) - .dropColumns(columnSourceManager.locationColumnName()); + private RowSetCacher(final ColumnSource> source, final int capacity) { + this.source = source; + this.results = new AtomicReferenceArray<>(capacity); + } - // Merge all the location index tables into a single table - final Table mergedDataIndexes = PartitionedTableFactory.of(locationDataIndexes).merge(); + RowSet get(final long rowKey) { + if (rowKey < 0 || rowKey >= results.length()) { + return null; + } - // Group the merged data indexes by the keys - final Table groupedByKeyColumns = mergedDataIndexes.groupBy(keyColumnNamesArray); + final int iRowKey = (int) rowKey; + do { + final Object localResult = results.get(iRowKey); + if (localResult instanceof RowSet) { + return (RowSet) localResult; + } + if (localResult instanceof Exception) { + throw new UncheckedDeephavenException("Exception found for cached RowSet", + (Exception) localResult); + } - // Combine the row sets from each group into a single row set - final Table combined = groupedByKeyColumns - .update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>( + if (localResult != null) { + // noinspection EmptySynchronizedStatement + synchronized (localResult) { + // don't care to do anything, we are just waiting for the barrier to be done + } + continue; + } + + // we need to create our own placeholder, and synchronize on it first + final Object placeholder = new Object(); + // noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (placeholder) { + if (!results.compareAndSet(iRowKey, null, placeholder)) { + // we must try again, someone else has claimed the placeholder + continue; + } + // We won the race, so it is our responsibility to get the right answer + + final ObjectVector inputRowSets = source.get(rowKey); + Assert.neqNull(inputRowSets, "inputRowSets"); + final RowSet computedResult; + try { + if (USE_PARALLEL_LAZY_FETCH) { + // noinspection DataFlowIssue + computedResult = mergeRowSetsParallel(rowKey, inputRowSets); + } else { + // noinspection DataFlowIssue + computedResult = mergeRowSetsSerial(rowKey, inputRowSets); + } + } catch (Exception e) { + results.set(iRowKey, e); + throw e; + } + results.set(iRowKey, computedResult); + return computedResult; + } + } while (true); + } + } + + private Table buildTable(final boolean lazyRowSetMerge) { + if (lazyTable != null) { + if (lazyRowSetMerge) { + return lazyTable; + } + } + + final long t0 = System.nanoTime(); + try { + final PartitionedTable partitionedTable; + if (lazyPartitionedTable != null) { + // We are synchronized, and can begin processing from the PartitionedTable rather than starting from + // scratch. The first step is to force our rowsets into memory, in parallel. + partitionedTable = lazyPartitionedTable.transform(t -> ForkJoinPoolOperationInitializer + .ensureParallelizable(() -> t.update(ROW_SET_COLUMN_NAME)).get()); + } else { + partitionedTable = buildPartitionedTable(lazyRowSetMerge); + } + + // Merge all the location index tables into a single table + final Table mergedDataIndexes = partitionedTable.merge(); + // Group the merged data indexes by the keys + final Table groupedByKeyColumns = mergedDataIndexes.groupBy(keyColumnNames.toArray(String[]::new)); + lookupFunction = AggregationProcessor.getRowLookup(groupedByKeyColumns); + + final Table combined; + if (lazyRowSetMerge) { + final ColumnSource> vectorColumnSource = + groupedByKeyColumns.getColumnSource(ROW_SET_COLUMN_NAME); + + // we are using a regular array here; so we must ensure that we are flat; or we'll have a bad time + Assert.assertion(groupedByKeyColumns.isFlat(), "groupedByKeyColumns.isFlat()"); + final RowSetCacher rowsetCacher = new RowSetCacher(vectorColumnSource, groupedByKeyColumns.intSize()); + combined = groupedByKeyColumns + .view(List.of(SelectColumn.ofStateless(new MultiSourceFunctionalColumn<>(List.of(), + ROW_SET_COLUMN_NAME, RowSet.class, (k, v) -> rowsetCacher.get(k))))); + + lazyPartitionedTable = partitionedTable; + lazyTable = combined; + } else { + // Combine the row sets from each group into a single row set + final List mergeFunction = List.of(SelectColumn.ofStateless(new FunctionalColumn<>( ROW_SET_COLUMN_NAME, ObjectVector.class, ROW_SET_COLUMN_NAME, RowSet.class, - this::mergeRowSets)))); - Assert.assertion(combined.isFlat(), "combined.isFlat()"); - Assert.eq(groupedByKeyColumns.size(), "groupedByKeyColumns.size()", combined.size(), "combined.size()"); + MergedDataIndex::mergeRowSetsSerial))); + + combined = groupedByKeyColumns.update(mergeFunction); + + indexTable = combined; + // if we were built off a lazy table, null it out now that we've setup the indexTable + lazyPartitionedTable = null; + lazyTable = null; + } + Assert.assertion(combined.isFlat(), "combined.isFlat()"); + Assert.eq(groupedByKeyColumns.size(), "groupedByKeyColumns.size()", combined.size(), "combined.size()"); - // Cleanup after ourselves - try (final CloseableIterator rowSets = mergedDataIndexes.objectColumnIterator(ROW_SET_COLUMN_NAME)) { - rowSets.forEachRemaining(SafeCloseable::close); + return combined; + } finally { + final long t1 = System.nanoTime(); + BUILD_INDEX_TABLE_MILLIS.sample((t1 - t0) / 1_000_000); } + } - lookupFunction = AggregationProcessor.getRowLookup(groupedByKeyColumns); - indexTable = combined; - return combined; + /** + * Perform a parallelizable update to produce coalesced location index tables with their row sets shifted by the + * appropriate region offset. The keys are always forced into memory so that the groupBy operation need not read + * from disk serially. + *

+ * If lazyRowSetMerge is true, the rowsets are read into memory as part of the mergeRowSets call and are not forced + * into memory here. If lazyRowSetMerge is false, then the RowSets are also forced into memory. + *

+ */ + private PartitionedTable buildPartitionedTable(final boolean lazyRowSetMerge) { + final String[] keyColumnNamesArray = keyColumnNames.toArray(String[]::new); + final Table locationTable = columnSourceManager.locationTable().coalesce(); + final Table locationDataIndexes = locationTable + .update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>( + columnSourceManager.locationColumnName(), TableLocation.class, + LOCATION_DATA_INDEX_TABLE_COLUMN_NAME, Table.class, + (final long locationRowKey, final TableLocation location) -> loadIndexTableAndShiftRowSets( + locationRowKey, location, keyColumnNamesArray, !lazyRowSetMerge))))) + .dropColumns(columnSourceManager.locationColumnName()); + + return PartitionedTableFactory.of(locationDataIndexes); } private static Table loadIndexTableAndShiftRowSets( final long locationRowKey, @NotNull final TableLocation location, - @NotNull final String[] keyColumnNames) { + @NotNull final String[] keyColumnNames, + final boolean selectRowSets) { final BasicDataIndex dataIndex = location.getDataIndex(keyColumnNames); if (dataIndex == null) { throw new UncheckedDeephavenException(String.format("Failed to load data index [%s] for location %s", String.join(", ", keyColumnNames), location)); } final Table indexTable = dataIndex.table(); - return indexTable.coalesce().update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>( - dataIndex.rowSetColumnName(), RowSet.class, - ROW_SET_COLUMN_NAME, RowSet.class, - (final RowSet rowSet) -> rowSet - .shift(RegionedColumnSource.getFirstRowKey(Math.toIntExact(locationRowKey))))))); + final long shiftAmount = RegionedColumnSource.getFirstRowKey(Math.toIntExact(locationRowKey)); + final Table coalesced = indexTable.coalesce(); + + final Selectable shiftFunction; + if (shiftAmount == 0) { + // A source column would be more convenient, but we are going to close the RowSet after we are done merging + // it and should not allow that close call to pass through to the original table. + shiftFunction = new FunctionalColumn<>( + dataIndex.rowSetColumnName(), RowSet.class, + ROW_SET_COLUMN_NAME, RowSet.class, + RowSet::copy); + } else { + shiftFunction = new FunctionalColumn<>( + dataIndex.rowSetColumnName(), RowSet.class, + ROW_SET_COLUMN_NAME, RowSet.class, + (final RowSet rowSet) -> rowSet.shift(shiftAmount)); + } + if (selectRowSets) { + // pull the key columns into memory while we are parallel; and read all the rowsets (by virtue of the shift) + final List columns = new ArrayList<>(keyColumnNames.length + 1); + Arrays.stream(keyColumnNames).map(ColumnName::of).forEach(columns::add); + columns.add(SelectColumn.ofStateless(shiftFunction)); + return ForkJoinPoolOperationInitializer.ensureParallelizable(() -> coalesced.update(columns)).get(); + } else { + // pull the key columns into memory while we are parallel; but do not read all the RowSets + final Table withInMemoryKeyColumns = + ForkJoinPoolOperationInitializer.ensureParallelizable(() -> coalesced.update(keyColumnNames)).get(); + return withInMemoryKeyColumns.updateView(List.of(SelectColumn.ofStateless(shiftFunction))); + } } - private RowSet mergeRowSets( + /** + * The returned RowSet is owned by the caller. The input RowSets are closed. + */ + private static RowSet mergeRowSetsSerial( @SuppressWarnings("unused") final long unusedRowKey, @NotNull final ObjectVector keyRowSets) { + final long numRowSets = keyRowSets.size(); + + if (numRowSets == 1) { + // we steal the reference, the input is never used again + return keyRowSets.get(0); + } + final RowSetBuilderSequential builder = RowSetFactory.builderSequential(); try (final CloseableIterator rowSets = keyRowSets.iterator()) { - rowSets.forEachRemaining(builder::appendRowSequence); + rowSets.forEachRemaining(rs -> { + builder.appendRowSequence(rs); + rs.close(); + }); } + + return builder.build(); + } + + /** + * The returned RowSet is owned by the caller. The input RowSets are closed. + */ + private static RowSet mergeRowSetsParallel( + @SuppressWarnings("unused") final long unusedRowKey, + @NotNull final ObjectVector keyRowSets) { + final long numRowSets = keyRowSets.size(); + + if (numRowSets == 1) { + // we steal the reference, the input is never used again + return keyRowSets.get(0); + } + final RowSetBuilderSequential builder = RowSetFactory.builderSequential(); + + LongStream.range(0, numRowSets).parallel().mapToObj(keyRowSets::get) + .sorted(Comparator.comparingLong(RowSet::firstRowKey)).forEachOrdered(rs -> { + builder.appendRowSequence(rs); + rs.close(); + }); + return builder.build(); } @Override @NotNull - public RowKeyLookup rowKeyLookup() { - table(); + public RowKeyLookup rowKeyLookup(final DataIndexOptions options) { + table(options); return (final Object key, final boolean usePrev) -> { // Pass the object to the aggregation lookup, then return the resulting row position (which is also the row // key). @@ -232,13 +459,8 @@ public boolean isValid() { final String[] keyColumnNamesArray = keyColumnNames.toArray(String[]::new); try (final CloseableIterator locations = columnSourceManager.locationTable().objectColumnIterator(columnSourceManager.locationColumnName())) { - while (locations.hasNext()) { - if (!locations.next().hasDataIndex(keyColumnNamesArray)) { - return isValid = false; - } - } + return isValid = locations.stream().parallel().allMatch(l -> l.hasDataIndex(keyColumnNamesArray)); } - return isValid = true; } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java index 4e7a9bb1a8d..1953470475e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java @@ -8,6 +8,7 @@ import io.deephaven.base.verify.Require; import io.deephaven.engine.rowset.*; import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.DataIndexOptions; import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableUpdate; @@ -300,13 +301,13 @@ public Map, String> keyColumnNamesByIndexedColumn() { @Override @NotNull - public Table table() { + public Table table(final DataIndexOptions unused) { return indexTable; } @Override @NotNull - public RowKeyLookup rowKeyLookup() { + public RowKeyLookup rowKeyLookup(final DataIndexOptions unusedOptions) { return (final Object key, final boolean usePrev) -> keyPositionMap.get(key); } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java index 0aeb1b05a07..7d3d2e7a08b 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java @@ -388,6 +388,15 @@ private static void checkIndex( @Test public void testStaticBasics() { + testStaticBasics(DataIndexOptions.DEFAULT); + } + + @Test + public void testStaticBasicsPartial() { + testStaticBasics(DataIndexOptions.USING_PARTIAL_TABLE); + } + + private void testStaticBasics(final DataIndexOptions options) { SUT = new RegionedColumnSourceManager(false, componentFactory, ColumnToCodecMappings.EMPTY, columnDefinitions); assertEquals(makeColumnSourceMap(), SUT.getColumnSources()); @@ -451,7 +460,11 @@ public void testStaticBasics() { } captureIndexes(SUT.initialize()); - capturedGroupingColumnIndex.table(); // Force us to build the merged index *before* we check satisfaction + + // Force us to build the merged index *before* we check satisfaction + // the checkIndexes method will call table() a second time with the DEFAULT options; which exercises lazy + // conversion + capturedGroupingColumnIndex.table(options); checkIndexes(); assertEquals(Arrays.asList(tableLocation1A, tableLocation1B), SUT.includedLocations()); @@ -482,7 +495,7 @@ private DataIndexImpl(@NotNull final Table table) { } @Override - public @NotNull Table table() { + public @NotNull Table table(DataIndexOptions ignored) { return table; }