From a15b9984cf1fb3f44b9b1ec3b7bfcef28ab020de Mon Sep 17 00:00:00 2001
From: "Charles P. Wright" <charleswright@deephaven.io>
Date: Fri, 3 Jan 2025 14:19:20 -0500
Subject: [PATCH] DH-18300: Improve DataIndex performance.

DataIndex, particularly when used for where() filters had missing
parallelization opportunities; and would read more data than strictly
necessary to satisfy the filter.

Statistics have been added to various operations, the existing Value class
was not thread safe.  The internal state has been updated to use volatiles
and AtomicLongFieldUpdaters.

The following Configuration properties have been added:
- AbstractColumnSource.usePartialDataIndex
- AbstractColumnSource.useParallelIndexBuild
- QueryTable.useDataIndexForAggregation
- MergedDataIndex.useParallelLazyFetch
---
 .../java/io/deephaven/base/stats/Value.java   |  47 ++-
 CONTRIBUTING.md                               |   5 +
 .../engine/table/BasicDataIndex.java          |  32 +-
 .../io/deephaven/engine/table/DataIndex.java  |   7 +-
 .../engine/table/DataIndexOptions.java        |  65 ++++
 .../table/impl/AbstractColumnSource.java      | 208 +++++++++----
 .../engine/table/impl/QueryTable.java         |  23 +-
 .../impl/dataindex/RemappedDataIndex.java     |   9 +-
 .../impl/dataindex/StandaloneDataIndex.java   |   3 +-
 .../impl/dataindex/TableBackedDataIndex.java  |   7 +-
 .../impl/dataindex/TransformedDataIndex.java  |   8 +-
 .../select/analyzers/SelectColumnLayer.java   |   5 +-
 .../sources/regioned/MergedDataIndex.java     | 288 ++++++++++++++----
 .../regioned/PartitioningColumnDataIndex.java |   5 +-
 .../TestRegionedColumnSourceManager.java      |   2 +-
 15 files changed, 566 insertions(+), 148 deletions(-)
 create mode 100644 engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java

diff --git a/Base/src/main/java/io/deephaven/base/stats/Value.java b/Base/src/main/java/io/deephaven/base/stats/Value.java
index cb8c098b9b4..65ee41424a1 100644
--- a/Base/src/main/java/io/deephaven/base/stats/Value.java
+++ b/Base/src/main/java/io/deephaven/base/stats/Value.java
@@ -3,14 +3,22 @@
 //
 package io.deephaven.base.stats;
 
-public abstract class Value {
+import java.text.DecimalFormat;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
-    protected long n = 0;
-    protected long last = 0;
-    protected long sum = 0;
-    protected long sum2 = 0;
-    protected long max = Long.MIN_VALUE;
-    protected long min = Long.MAX_VALUE;
+public abstract class Value {
+    private static final AtomicLongFieldUpdater<Value> N_UPDATER = AtomicLongFieldUpdater.newUpdater(Value.class, "n");
+    private static final AtomicLongFieldUpdater<Value> SUM_UPDATER =
+            AtomicLongFieldUpdater.newUpdater(Value.class, "sum");
+    private static final AtomicLongFieldUpdater<Value> SUM2_UPDATER =
+            AtomicLongFieldUpdater.newUpdater(Value.class, "sum2");
+
+    volatile protected long n = 0;
+    volatile protected long last = 0;
+    volatile protected long sum = 0;
+    volatile protected long sum2 = 0;
+    volatile protected long max = Long.MIN_VALUE;
+    volatile protected long min = Long.MAX_VALUE;
 
     private boolean alwaysUpdated = false;
 
@@ -52,10 +60,10 @@ protected Value(History history) {
     }
 
     public void sample(long x) {
-        n++;
+        N_UPDATER.incrementAndGet(this);
+        SUM_UPDATER.addAndGet(this, x);
+        SUM2_UPDATER.addAndGet(this, x * x);
         last = x;
-        sum += x;
-        sum2 += x * x;
         if (x > max) {
             max = x;
         }
@@ -99,4 +107,23 @@ public void update(Item item, ItemUpdateListener listener, long logInterval, lon
             }
         }
     }
+
+    @Override
+    public String toString() {
+        final DecimalFormat format = new DecimalFormat("#,###");
+        final DecimalFormat avgFormat = new DecimalFormat("#,###.###");
+
+        final double variance = n > 1 ? (sum2 - ((double) sum * sum / (double) n)) / (n - 1) : Double.NaN;
+
+        return "Value{" +
+                "n=" + format.format(n) +
+                (n > 0 ? ", sum=" + format.format(sum) +
+                        ", max=" + format.format(max) +
+                        ", min=" + format.format(min) +
+                        ", avg=" + avgFormat.format((n > 0 ? (double) sum / n : Double.NaN)) +
+                        ", std=" + avgFormat.format(Math.sqrt(variance))
+                        : "")
+                +
+                '}';
+    }
 }
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 39aea6d0072..b1f17cddfcf 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -54,6 +54,11 @@ For more information, see:
 * [gh pr create](https://cli.github.com/manual/gh_pr_create)
 * [CLI In Use](https://cli.github.com/manual/examples.html)
 
+## Labels
+
+- Each pull request msut have a `ReleaseNotesNeeded` label if a user can perceive the changes.  Build or testing-only changes should have a `NoReleaseNotesNeeded` label.
+- Each pull request must have a `DocumentationNeeded` label if the user guide should be updated.  Pull requests that do not require a user guide update should have a `NoDocumentationNeeded` label.
+
 ## Styleguide
 The [styleguide](style/README.md) is applied globally to the entire project, except for generated code that gets checked in.
 To apply the styleguide, run `./gradlew spotlessApply`.
diff --git a/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java b/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java
index 13ff41aa000..f1f09f096be 100644
--- a/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java
+++ b/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java
@@ -96,16 +96,43 @@ default ColumnSource<?>[] keyColumns(@NotNull final ColumnSource<?>[] indexedCol
     @FinalDefault
     @NotNull
     default ColumnSource<RowSet> rowSetColumn() {
-        return table().getColumnSource(rowSetColumnName(), RowSet.class);
+        return rowSetColumn(DataIndexOptions.DEFAULT);
+    }
+
+    /**
+     * Get the {@link RowSet} {@link ColumnSource} of the index {@link #table() table}.
+     *
+     * @return The {@link RowSet} {@link ColumnSource}
+     */
+    @FinalDefault
+    @NotNull
+    default ColumnSource<RowSet> rowSetColumn(final DataIndexOptions options) {
+        return table(options).getColumnSource(rowSetColumnName(), RowSet.class);
     }
 
     /**
      * Get the {@link Table} backing this data index.
+     *
+     * <p>
+     * The returned table is fully in-memory, equivalent to {@link #table(DataIndexOptions)} with default options.
+     * </p>
      * 
      * @return The {@link Table}
      */
     @NotNull
-    Table table();
+    default Table table() {
+        return table(DataIndexOptions.DEFAULT);
+    }
+
+    /**
+     * Get the {@link Table} backing this data index.
+     *
+     * @param options parameters to control the returned table
+     *
+     * @return The {@link Table}
+     */
+    @NotNull
+    Table table(DataIndexOptions options);
 
     /**
      * Whether the index {@link #table()} {@link Table#isRefreshing() is refreshing}. Some transformations will force
@@ -115,4 +142,5 @@ default ColumnSource<RowSet> rowSetColumn() {
      *         otherwise
      */
     boolean isRefreshing();
+
 }
diff --git a/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java b/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java
index b257bdbc8d0..3dbdc8eeca7 100644
--- a/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java
+++ b/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java
@@ -48,7 +48,12 @@ interface RowKeyLookup {
      * @return A function that provides map-like lookup of index {@link #table()} row keys from an index lookup key
      */
     @NotNull
-    RowKeyLookup rowKeyLookup();
+    default RowKeyLookup rowKeyLookup() {
+        return rowKeyLookup(DataIndexOptions.DEFAULT);
+    }
+
+    @NotNull
+    RowKeyLookup rowKeyLookup(DataIndexOptions options);
 
     /**
      * Return a {@link RowKeyLookup lookup function} function of index row keys for this index. If
diff --git a/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java b/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java
new file mode 100644
index 00000000000..becd0948b2c
--- /dev/null
+++ b/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java
@@ -0,0 +1,65 @@
+//
+// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
+//
+package io.deephaven.engine.table;
+
+import io.deephaven.annotations.BuildableStyle;
+import io.deephaven.api.filter.Filter;
+import org.immutables.value.Value;
+
+/**
+ * Options for controlling the function of a {@link DataIndex}.
+ *
+ * <p>
+ * Presently, this is used for the {@link Table#where(Filter)} operation to more efficiently handle data index matches,
+ * without necessarily reading all RowSet information from disk across partitions.
+ * </p>
+ */
+@Value.Immutable
+@BuildableStyle
+public interface DataIndexOptions {
+    DataIndexOptions DEFAULT = DataIndexOptions.builder().build();
+
+    /**
+     * Does this operation use only a subset of the DataIndex?
+     *
+     * <p>
+     * The DataIndex implementation may use this hint to defer work for some row sets.
+     * </p>
+     *
+     * @return if this operation is only going to use a subset of this data index
+     */
+    @Value.Default
+    default boolean operationUsesPartialTable() {
+        return false;
+    }
+
+    /**
+     * Create a new builder for a {@link DataIndexOptions}.
+     * 
+     * @return
+     */
+    static Builder builder() {
+        return ImmutableDataIndexOptions.builder();
+    }
+
+    /**
+     * The builder interface to construct a {@link DataIndexOptions}.
+     */
+    interface Builder {
+        /**
+         * Set whether this operation only uses a subset of the data index.
+         *
+         * @param usesPartialTable true if this operation only uses a partial table
+         * @return this builder
+         */
+        Builder operationUsesPartialTable(boolean usesPartialTable);
+
+        /**
+         * Build the {@link DataIndexOptions}.
+         * 
+         * @return an immutable DataIndexOptions structure.
+         */
+        DataIndexOptions build();
+    }
+}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java
index 65d434d6598..eae1bf0cb5b 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java
@@ -3,6 +3,9 @@
 //
 package io.deephaven.engine.table.impl;
 
+import io.deephaven.base.stats.Counter;
+import io.deephaven.base.stats.Stats;
+import io.deephaven.base.stats.Value;
 import io.deephaven.base.string.cache.CharSequenceUtils;
 import io.deephaven.base.verify.Assert;
 import io.deephaven.chunk.Chunk;
@@ -10,12 +13,14 @@
 import io.deephaven.chunk.ObjectChunk;
 import io.deephaven.chunk.WritableChunk;
 import io.deephaven.chunk.attributes.Values;
+import io.deephaven.configuration.Configuration;
 import io.deephaven.engine.context.ExecutionContext;
 import io.deephaven.engine.primitive.iterator.CloseableIterator;
 import io.deephaven.engine.rowset.*;
 import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
 import io.deephaven.engine.table.ColumnSource;
 import io.deephaven.engine.table.DataIndex;
+import io.deephaven.engine.table.DataIndexOptions;
 import io.deephaven.engine.table.Table;
 import io.deephaven.engine.table.impl.chunkfillers.ChunkFiller;
 import io.deephaven.engine.table.impl.chunkfilter.ChunkFilter;
@@ -34,12 +39,48 @@
 
 import java.time.Instant;
 import java.time.ZonedDateTime;
+import java.util.Arrays;
 import java.util.Collections;
 
 public abstract class AbstractColumnSource<T> implements
         ColumnSource<T>,
         DefaultChunkSource.WithPrev<Values> {
 
+    /**
+     * For a {@link #match(boolean, boolean, boolean, DataIndex, RowSet, Object...)} call that uses a DataIndex, by
+     * default we do not force the entire DataIndex to be loaded into memory. This is because many
+     * {@link io.deephaven.engine.table.impl.select.MatchFilter}s are highly selective and only need to instantiate a
+     * single RowSet value rather than the complete DataIndex for the entire table. When the Configuration property
+     * "AbstractColumnSource.usePartialDataIndex" is set to false, the query engine materializes the entire DataIndex
+     * table for the match call.
+     */
+    public static boolean USE_PARTIAL_TABLE_DATA_INDEX = Configuration.getInstance()
+            .getBooleanWithDefault("AbstractColumnSource.usePartialDataIndex", true);
+    /**
+     * After generating a DataIndex table and identifying which row keys are responsive to the filter, the result Index
+     * can be built in serial or in parallel. By default, the index is built in parallel which may take advantage of
+     * using more threads for I/O of the index data structure. Parallel builds do require more setup and thread
+     * synchronization, so they can be disabled by setting the Configuration property
+     * "AbstractColumnSource.useParallelIndexBuild" to false.
+     */
+    public static boolean USE_PARALLEL_INDEX_BUILD = Configuration.getInstance()
+            .getBooleanWithDefault("AbstractColumnSource.useParallelIndexBuild", true);
+
+    /**
+     * Duration of match() calls using a DataIndex (also provides the count).
+     */
+    public static final Value indexFilter =
+            Stats.makeItem("AbstractColumnSource", "indexFilter", Counter.FACTORY,
+                    "Duration of match() with a DataIndex in nanos")
+                    .getValue();
+    /**
+     * Duration of match() calls using a chunk filter (i.e. no DataIndex).
+     */
+    public static final Value chunkFilter =
+            Stats.makeItem("AbstractColumnSource", "chunkFilter", Counter.FACTORY,
+                    "Duration of match() without a DataIndex in nanos")
+                    .getValue();
+
     /**
      * Minimum average run length in an {@link RowSequence} that should trigger {@link Chunk}-filling by key ranges
      * instead of individual keys.
@@ -48,6 +89,13 @@ public abstract class AbstractColumnSource<T> implements
 
     private static final int CHUNK_SIZE = 1 << 11;
 
+    /**
+     * The match operation does not use the complete table, it just picks out the relevant indices so there is no reason
+     * to actually read it fully.
+     */
+    private static final DataIndexOptions PARTIAL_TABLE_DATA_INDEX =
+            DataIndexOptions.builder().operationUsesPartialTable(true).build();
+
     protected final Class<T> type;
     protected final Class<?> componentType;
 
@@ -115,82 +163,134 @@ public WritableRowSet match(
             final boolean usePrev,
             final boolean caseInsensitive,
             @Nullable final DataIndex dataIndex,
-            @NotNull final RowSet mapper,
+            @NotNull final RowSet rowsetToFilter,
             final Object... keys) {
+        if (dataIndex == null) {
+            return doChunkFilter(invertMatch, usePrev, caseInsensitive, rowsetToFilter, keys);
+        }
+        final long t0 = System.nanoTime();
+        try {
+            return doDataIndexFilter(invertMatch, usePrev, caseInsensitive, dataIndex, rowsetToFilter, keys);
+        } finally {
+            final long t1 = System.nanoTime();
+            indexFilter.sample(t1 - t0);
+        }
+    }
 
-        if (dataIndex != null) {
-            final Table indexTable = dataIndex.table();
-            final RowSet matchingIndexRows;
-            if (caseInsensitive && type == String.class) {
-                // Linear scan through the index table, accumulating index row keys for case-insensitive matches
-                final RowSetBuilderSequential matchingIndexRowsBuilder = RowSetFactory.builderSequential();
-
-                // noinspection rawtypes
-                final KeyedObjectHashSet keySet = new KeyedObjectHashSet<>(new CIStringKey());
-                // noinspection unchecked
-                Collections.addAll(keySet, keys);
-
-                final RowSet indexRowSet = usePrev ? indexTable.getRowSet().prev() : indexTable.getRowSet();
-                final ColumnSource<?> indexKeySource =
-                        indexTable.getColumnSource(dataIndex.keyColumnNames().get(0), String.class);
-
-                final int chunkSize = (int) Math.min(CHUNK_SIZE, indexRowSet.size());
-                try (final RowSequence.Iterator indexRowSetIterator = indexRowSet.getRowSequenceIterator();
-                        final GetContext indexKeyGetContext = indexKeySource.makeGetContext(chunkSize)) {
-                    while (indexRowSetIterator.hasMore()) {
-                        final RowSequence chunkIndexRows = indexRowSetIterator.getNextRowSequenceWithLength(chunkSize);
-                        final ObjectChunk<String, ? extends Values> chunkKeys = (usePrev
-                                ? indexKeySource.getPrevChunk(indexKeyGetContext, chunkIndexRows)
-                                : indexKeySource.getChunk(indexKeyGetContext, chunkIndexRows)).asObjectChunk();
-                        final LongChunk<OrderedRowKeys> chunkRowKeys = chunkIndexRows.asRowKeyChunk();
-                        final int thisChunkSize = chunkKeys.size();
-                        for (int ii = 0; ii < thisChunkSize; ++ii) {
-                            final String key = chunkKeys.get(ii);
-                            if (keySet.containsKey(key)) {
-                                matchingIndexRowsBuilder.appendKey(chunkRowKeys.get(ii));
-                            }
+    private WritableRowSet doDataIndexFilter(final boolean invertMatch,
+            final boolean usePrev,
+            final boolean caseInsensitive,
+            @NotNull final DataIndex dataIndex,
+            @NotNull final RowSet rowsetToFilter,
+            final Object[] keys) {
+        final DataIndexOptions partialOption =
+                USE_PARTIAL_TABLE_DATA_INDEX ? PARTIAL_TABLE_DATA_INDEX : DataIndexOptions.DEFAULT;
+
+        final Table indexTable = dataIndex.table(partialOption);
+
+        final RowSet matchingIndexRows;
+        if (caseInsensitive && type == String.class) {
+            // Linear scan through the index table, accumulating index row keys for case-insensitive matches
+            final RowSetBuilderSequential matchingIndexRowsBuilder = RowSetFactory.builderSequential();
+
+            // noinspection rawtypes
+            final KeyedObjectHashSet keySet = new KeyedObjectHashSet<>(new CIStringKey());
+            // noinspection unchecked
+            Collections.addAll(keySet, keys);
+
+            final RowSet indexRowSet = usePrev ? indexTable.getRowSet().prev() : indexTable.getRowSet();
+            final ColumnSource<?> indexKeySource =
+                    indexTable.getColumnSource(dataIndex.keyColumnNames().get(0), String.class);
+
+            final int chunkSize = (int) Math.min(CHUNK_SIZE, indexRowSet.size());
+            try (final RowSequence.Iterator indexRowSetIterator = indexRowSet.getRowSequenceIterator();
+                    final GetContext indexKeyGetContext = indexKeySource.makeGetContext(chunkSize)) {
+                while (indexRowSetIterator.hasMore()) {
+                    final RowSequence chunkIndexRows = indexRowSetIterator.getNextRowSequenceWithLength(chunkSize);
+                    final ObjectChunk<String, ? extends Values> chunkKeys = (usePrev
+                            ? indexKeySource.getPrevChunk(indexKeyGetContext, chunkIndexRows)
+                            : indexKeySource.getChunk(indexKeyGetContext, chunkIndexRows)).asObjectChunk();
+                    final LongChunk<OrderedRowKeys> chunkRowKeys = chunkIndexRows.asRowKeyChunk();
+                    final int thisChunkSize = chunkKeys.size();
+                    for (int ii = 0; ii < thisChunkSize; ++ii) {
+                        final String key = chunkKeys.get(ii);
+                        if (keySet.containsKey(key)) {
+                            matchingIndexRowsBuilder.appendKey(chunkRowKeys.get(ii));
                         }
                     }
                 }
-                matchingIndexRows = matchingIndexRowsBuilder.build();
-            } else {
-                // Use the lookup function to get the index row keys for the matching keys
-                final RowSetBuilderRandom matchingIndexRowsBuilder = RowSetFactory.builderRandom();
-
-                final DataIndex.RowKeyLookup rowKeyLookup = dataIndex.rowKeyLookup();
-                for (Object key : keys) {
-                    final long rowKey = rowKeyLookup.apply(key, usePrev);
-                    if (rowKey != RowSequence.NULL_ROW_KEY) {
-                        matchingIndexRowsBuilder.addKey(rowKey);
-                    }
+            }
+            matchingIndexRows = matchingIndexRowsBuilder.build();
+        } else {
+            // Use the lookup function to get the index row keys for the matching keys
+            final RowSetBuilderRandom matchingIndexRowsBuilder = RowSetFactory.builderRandom();
+
+            final DataIndex.RowKeyLookup rowKeyLookup = dataIndex.rowKeyLookup(partialOption);
+            for (final Object key : keys) {
+                final long rowKey = rowKeyLookup.apply(key, usePrev);
+                if (rowKey != RowSequence.NULL_ROW_KEY) {
+                    matchingIndexRowsBuilder.addKey(rowKey);
                 }
-                matchingIndexRows = matchingIndexRowsBuilder.build();
             }
+            matchingIndexRows = matchingIndexRowsBuilder.build();
+        }
 
-            try (final SafeCloseable ignored = matchingIndexRows) {
-                final WritableRowSet filtered = invertMatch ? mapper.copy() : RowSetFactory.empty();
-                if (matchingIndexRows.isNonempty()) {
-                    final ColumnSource<RowSet> indexRowSetSource = usePrev
-                            ? dataIndex.rowSetColumn().getPrevSource()
-                            : dataIndex.rowSetColumn();
+        try (final SafeCloseable ignored = matchingIndexRows) {
+            final WritableRowSet filtered = invertMatch ? rowsetToFilter.copy() : RowSetFactory.empty();
+            if (matchingIndexRows.isNonempty()) {
+                final ColumnSource<RowSet> indexRowSetSource = usePrev
+                        ? dataIndex.rowSetColumn(partialOption).getPrevSource()
+                        : dataIndex.rowSetColumn(partialOption);
+
+                if (USE_PARALLEL_INDEX_BUILD) {
+                    final long[] rowKeyArray = new long[matchingIndexRows.intSize()];
+                    matchingIndexRows.toRowKeyArray(rowKeyArray);
+                    Arrays.stream(rowKeyArray).parallel().forEach((final long rowKey) -> {
+                        final RowSet matchingRowSet = indexRowSetSource.get(rowKey);
+                        assert matchingRowSet != null;
+                        if (invertMatch) {
+                            synchronized (filtered) {
+                                filtered.remove(matchingRowSet);
+                            }
+                        } else {
+                            try (final RowSet intersected = matchingRowSet.intersect(rowsetToFilter)) {
+                                synchronized (filtered) {
+                                    filtered.insert(intersected);
+                                }
+                            }
+                        }
+                    });
+                } else {
                     try (final CloseableIterator<RowSet> matchingIndexRowSetIterator =
                             ChunkedColumnIterator.make(indexRowSetSource, matchingIndexRows)) {
                         matchingIndexRowSetIterator.forEachRemaining((final RowSet matchingRowSet) -> {
                             if (invertMatch) {
                                 filtered.remove(matchingRowSet);
                             } else {
-                                try (final RowSet intersected = matchingRowSet.intersect(mapper)) {
+                                try (final RowSet intersected = matchingRowSet.intersect(rowsetToFilter)) {
                                     filtered.insert(intersected);
                                 }
                             }
                         });
                     }
                 }
-                return filtered;
             }
-        } else {
-            return ChunkFilter.applyChunkFilter(mapper, this, usePrev,
+            return filtered;
+        }
+    }
+
+    private WritableRowSet doChunkFilter(final boolean invertMatch,
+            final boolean usePrev,
+            final boolean caseInsensitive,
+            @NotNull final RowSet rowsetToFilter,
+            final Object[] keys) {
+        final long t0 = System.nanoTime();
+        try {
+            return ChunkFilter.applyChunkFilter(rowsetToFilter, this, usePrev,
                     ChunkMatchFilterFactory.getChunkFilter(type, caseInsensitive, invertMatch, keys));
+        } finally {
+            final long t1 = System.nanoTime();
+            chunkFilter.sample(t1 - t0);
         }
     }
 
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java
index a431891f94d..b5cec945450 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java
@@ -198,12 +198,20 @@ public interface MemoizableOperation<T extends DynamicNode & NotificationStepRec
             Configuration.getInstance().getBooleanWithDefault("QueryTable.redirectSelect", false);
 
     /**
-     * If set to true, then permit where filters to use a data index, when applicable. If false, data indexes are not
-     * used even if present.
+     * If the Configuration property "QueryTable.useDataIndexForWhere" is set to true (default), then permit where
+     * filters to use a data index, when applicable. If false, data indexes are not used even if present.
      */
     public static boolean USE_DATA_INDEX_FOR_WHERE =
             Configuration.getInstance().getBooleanWithDefault("QueryTable.useDataIndexForWhere", true);
 
+    /**
+     * If the Configuration property "QueryTable.useDataIndexForAggregation" is set to true (default), then permit
+     * aggregation to use a data index, when applicable. If false, data indexes are not used even if present.
+     */
+    public static boolean USE_DATA_INDEX_FOR_AGGREGATION =
+            Configuration.getInstance().getBooleanWithDefault("QueryTable.useDataIndexForAggregation", true);
+
+
     /**
      * For a static select(), we would prefer to flatten the table to avoid using memory unnecessarily (because the data
      * may be spread out across many blocks depending on the input RowSet). However, the select() can become slower
@@ -865,8 +873,10 @@ public QueryTable aggNoMemo(
         final UpdateGraph updateGraph = getUpdateGraph();
         try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) {
             final String description = "aggregation(" + aggregationContextFactory + ", " + groupByColumns + ")";
+            final AggregationControl aggregationControl =
+                    USE_DATA_INDEX_FOR_AGGREGATION ? AggregationControl.DEFAULT : AggregationControl.IGNORE_INDEXING;
             return QueryPerformanceRecorder.withNugget(description, sizeForInstrumentation(),
-                    () -> ChunkedOperatorAggregationHelper.aggregation(
+                    () -> ChunkedOperatorAggregationHelper.aggregation(aggregationControl,
                             aggregationContextFactory, this, preserveEmpty, initialGroups, groupByColumns));
         }
     }
@@ -1237,13 +1247,14 @@ private void initializeAndPrioritizeFilters(@NotNull final WhereFilter... filter
         final int numFilters = filters.length;
         final BitSet priorityFilterIndexes = new BitSet(numFilters);
 
-        final QueryCompilerRequestProcessor.BatchProcessor compilationProcesor = QueryCompilerRequestProcessor.batch();
+        final QueryCompilerRequestProcessor.BatchProcessor compilationProcessor = QueryCompilerRequestProcessor.batch();
+
         // Initialize our filters immediately so we can examine the columns they use. Note that filter
         // initialization is safe to invoke repeatedly.
         for (final WhereFilter filter : filters) {
-            filter.init(getDefinition(), compilationProcesor);
+            filter.init(getDefinition(), compilationProcessor);
         }
-        compilationProcesor.compile();
+        compilationProcessor.compile();
 
         for (int fi = 0; fi < numFilters; ++fi) {
             final WhereFilter filter = filters[fi];
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/RemappedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/RemappedDataIndex.java
index 95311871f96..b7f960eb5f8 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/RemappedDataIndex.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/RemappedDataIndex.java
@@ -6,6 +6,7 @@
 import io.deephaven.base.verify.Assert;
 import io.deephaven.engine.table.ColumnSource;
 import io.deephaven.engine.table.DataIndex;
+import io.deephaven.engine.table.DataIndexOptions;
 import io.deephaven.engine.table.Table;
 import io.deephaven.engine.table.impl.indexer.DataIndexer;
 import org.jetbrains.annotations.NotNull;
@@ -91,14 +92,14 @@ public List<String> keyColumnNames() {
 
     @Override
     @NotNull
-    public Table table() {
-        return sourceIndex.table();
+    public Table table(final DataIndexOptions options) {
+        return sourceIndex.table(options);
     }
 
     @Override
     @NotNull
-    public RowKeyLookup rowKeyLookup() {
-        return sourceIndex.rowKeyLookup();
+    public RowKeyLookup rowKeyLookup(final DataIndexOptions options) {
+        return sourceIndex.rowKeyLookup(options);
     }
 
     @Override
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/StandaloneDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/StandaloneDataIndex.java
index b496c96171b..cf4eafbf9c7 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/StandaloneDataIndex.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/StandaloneDataIndex.java
@@ -6,6 +6,7 @@
 import io.deephaven.engine.liveness.LivenessArtifact;
 import io.deephaven.engine.table.BasicDataIndex;
 import io.deephaven.engine.table.ColumnSource;
+import io.deephaven.engine.table.DataIndexOptions;
 import io.deephaven.engine.table.Table;
 import org.jetbrains.annotations.NotNull;
 
@@ -63,7 +64,7 @@ public String rowSetColumnName() {
 
     @Override
     @NotNull
-    public Table table() {
+    public Table table(final DataIndexOptions unused) {
         return table;
     }
 
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TableBackedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TableBackedDataIndex.java
index 4802143a9a1..d2a9d8659a1 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TableBackedDataIndex.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TableBackedDataIndex.java
@@ -9,6 +9,7 @@
 import io.deephaven.engine.context.ExecutionContext;
 import io.deephaven.engine.liveness.LivenessScopeStack;
 import io.deephaven.engine.table.ColumnSource;
+import io.deephaven.engine.table.DataIndexOptions;
 import io.deephaven.engine.table.Table;
 import io.deephaven.engine.table.impl.QueryTable;
 import io.deephaven.engine.table.impl.by.*;
@@ -83,7 +84,7 @@ public Map<ColumnSource<?>, String> keyColumnNamesByIndexedColumn() {
 
     @Override
     @NotNull
-    public Table table() {
+    public Table table(final DataIndexOptions unused) {
         Table localIndexTable;
         if ((localIndexTable = indexTable) != null) {
             return localIndexTable;
@@ -136,8 +137,8 @@ private Table computeTable() {
 
     @Override
     @NotNull
-    public RowKeyLookup rowKeyLookup() {
-        table();
+    public RowKeyLookup rowKeyLookup(final DataIndexOptions options) {
+        table(options);
         return (final Object key, final boolean usePrev) -> {
             // Pass the object to the aggregation lookup, then return the resulting row key. This index will be
             // correct in prev or current space because of the aggregation's hash-based lookup.
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TransformedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TransformedDataIndex.java
index 7759adcaa49..c9f923566b4 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TransformedDataIndex.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TransformedDataIndex.java
@@ -10,11 +10,7 @@
 import io.deephaven.engine.liveness.LivenessScopeStack;
 import io.deephaven.engine.rowset.RowSet;
 import io.deephaven.engine.rowset.WritableRowSet;
-import io.deephaven.engine.table.BasicDataIndex;
-import io.deephaven.engine.table.ColumnSource;
-import io.deephaven.engine.table.DataIndex;
-import io.deephaven.engine.table.DataIndexTransformer;
-import io.deephaven.engine.table.Table;
+import io.deephaven.engine.table.*;
 import io.deephaven.engine.table.impl.ForkJoinPoolOperationInitializer;
 import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
 import io.deephaven.engine.table.impl.select.FunctionalColumn;
@@ -72,7 +68,7 @@ public String rowSetColumnName() {
 
     @Override
     @NotNull
-    public Table table() {
+    public Table table(final DataIndexOptions unused) {
         Table localIndexTable;
         if ((localIndexTable = indexTable) != null) {
             return localIndexTable;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java
index 431d00ee7e7..356bd05f8b7 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java
@@ -191,7 +191,10 @@ public Runnable createUpdateHandler(
             // If we have shifts, that makes everything nasty; so we do not want to deal with it
             final boolean hasShifts = upstream.shifted().nonempty();
 
-            final boolean serialTableOperationsSafe = updateGraph.serialTableOperationsSafe()
+            // liveResultOwner is only null when we are static; in the static case there is no need to
+            // worry about serial table operation checking
+            final boolean serialTableOperationsSafe = liveResultOwner == null
+                    || updateGraph.serialTableOperationsSafe()
                     || updateGraph.sharedLock().isHeldByCurrentThread()
                     || updateGraph.exclusiveLock().isHeldByCurrentThread();
 
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java
index 881e8a21e88..a19923566c8 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java
@@ -4,17 +4,20 @@
 package io.deephaven.engine.table.impl.sources.regioned;
 
 import io.deephaven.UncheckedDeephavenException;
+import io.deephaven.api.ColumnName;
+import io.deephaven.api.Selectable;
+import io.deephaven.base.stats.Counter;
+import io.deephaven.base.stats.Stats;
+import io.deephaven.base.stats.Value;
 import io.deephaven.base.verify.Assert;
 import io.deephaven.base.verify.Require;
+import io.deephaven.configuration.Configuration;
 import io.deephaven.engine.primitive.iterator.CloseableIterator;
 import io.deephaven.engine.rowset.RowSequence;
 import io.deephaven.engine.rowset.RowSet;
 import io.deephaven.engine.rowset.RowSetBuilderSequential;
 import io.deephaven.engine.rowset.RowSetFactory;
-import io.deephaven.engine.table.BasicDataIndex;
-import io.deephaven.engine.table.ColumnSource;
-import io.deephaven.engine.table.PartitionedTableFactory;
-import io.deephaven.engine.table.Table;
+import io.deephaven.engine.table.*;
 import io.deephaven.engine.table.impl.ForkJoinPoolOperationInitializer;
 import io.deephaven.engine.table.impl.by.AggregationProcessor;
 import io.deephaven.engine.table.impl.by.AggregationRowLookup;
@@ -23,14 +26,17 @@
 import io.deephaven.engine.table.impl.locations.TableLocation;
 import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
 import io.deephaven.engine.table.impl.select.FunctionalColumn;
+import io.deephaven.engine.table.impl.select.MultiSourceFunctionalColumn;
 import io.deephaven.engine.table.impl.select.SelectColumn;
-import io.deephaven.util.SafeCloseable;
 import io.deephaven.util.annotations.InternalUseOnly;
 import io.deephaven.vector.ObjectVector;
 import org.jetbrains.annotations.NotNull;
 
 import java.util.*;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.IntStream;
+import java.util.stream.LongStream;
 
 /**
  * DataIndex that accumulates the individual per-{@link TableLocation} data indexes of a {@link Table} backed by a
@@ -45,6 +51,21 @@
  */
 @InternalUseOnly
 class MergedDataIndex extends AbstractDataIndex implements DataIndexer.RetainableDataIndex {
+    /**
+     * The duration in nanos to build a DataIndex table.
+     */
+    public static final Value buildIndexTable = Stats
+            .makeItem("DataIndex", "buildTable", Counter.FACTORY, "Duration in nanos of building an index").getValue();
+
+    /**
+     * When merging row sets from multiple component DataIndex structures, reading each individual component can be
+     * performed in parallel to improve the wall-clock I/O time observed. For cases where the number of individual
+     * groups to merge is small and I/O bound this can be very beneficial. Setting up the parallelism, however, can add
+     * additional overhead. Setting the Configuration property "MergedDataIndex.useParallelLazyFetch" to false disables
+     * this behavior and each key's merged RowSet is computed serially.
+     */
+    public static boolean USE_PARALLEL_LAZY_FETCH = Configuration.getInstance()
+            .getBooleanWithDefault("MergedDataIndex.useParallelLazyFetch", true);
 
     private static final String LOCATION_DATA_INDEX_TABLE_COLUMN_NAME = "__DataIndexTable";
 
@@ -63,6 +84,12 @@ class MergedDataIndex extends AbstractDataIndex implements DataIndexer.Retainabl
      */
     private volatile Table indexTable;
 
+    /**
+     * A lazy version of the table, note this value is never set if indexTable is set. A lazy table can be converted to
+     * an indexTable by selecting the rowset column.
+     */
+    private volatile Table lazyTable;
+
     /**
      * Whether this index is known to be corrupt.
      */
@@ -113,19 +140,29 @@ public Map<ColumnSource<?>, String> keyColumnNamesByIndexedColumn() {
 
     @Override
     @NotNull
-    public Table table() {
+    public Table table(final DataIndexOptions options) {
         Table localIndexTable;
         if ((localIndexTable = indexTable) != null) {
             return localIndexTable;
         }
+        final boolean lazyRowsetMerge = options.operationUsesPartialTable();
+        if (lazyRowsetMerge) {
+            if ((localIndexTable = lazyTable) != null) {
+                return localIndexTable;
+            }
+        }
         synchronized (this) {
             if ((localIndexTable = indexTable) != null) {
                 return localIndexTable;
+            } else if (lazyRowsetMerge) {
+                if ((localIndexTable = lazyTable) != null) {
+                    return localIndexTable;
+                }
             }
             try {
                 return QueryPerformanceRecorder.withNugget(
                         String.format("Merge Data Indexes [%s]", String.join(", ", keyColumnNames)),
-                        ForkJoinPoolOperationInitializer.ensureParallelizable(this::buildTable));
+                        ForkJoinPoolOperationInitializer.ensureParallelizable(() -> buildTable(lazyRowsetMerge)));
             } catch (Throwable t) {
                 isCorrupt = true;
                 throw t;
@@ -133,45 +170,155 @@ public Table table() {
         }
     }
 
-    private Table buildTable() {
-        final Table locationTable = columnSourceManager.locationTable().coalesce();
+    /**
+     * The RowSetCacher is a bit of a hack that allows us to avoid reading actual rowsets from disk until they are
+     * actually required for a query operation. We are breaking engine rules in that we reference the source
+     * ColumnSource directly and do not have correct dependencies encoded in a select. MergedDataIndexes are only
+     * permitted for a static table, so we can get away with this.
+     *
+     * <p>
+     * Once a RowSet has been written, we write down the result into an AtomicReferenceArray so that it need not be read
+     * repeatedly. If two threads attempt to concurrently fetch the same rowset, then we use a placeholder object in the
+     * array to avoid duplicated work.
+     * </p>
+     */
+    private static class RowsetCacher {
+        final ColumnSource<ObjectVector<RowSet>> source;
+        final AtomicReferenceArray<Object> results;
 
-        // Perform a parallelizable update to produce coalesced location index tables with their row sets shifted by
-        // the appropriate region offset.
-        // This potentially loads many small row sets into memory, but it avoids the risk of re-materializing row set
-        // pages during the accumulation phase.
-        final String[] keyColumnNamesArray = keyColumnNames.toArray(String[]::new);
-        final Table locationDataIndexes = locationTable
-                .update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>(
-                        columnSourceManager.locationColumnName(), TableLocation.class,
-                        LOCATION_DATA_INDEX_TABLE_COLUMN_NAME, Table.class,
-                        (final long locationRowKey, final TableLocation location) -> loadIndexTableAndShiftRowSets(
-                                locationRowKey, location, keyColumnNamesArray)))))
-                .dropColumns(columnSourceManager.locationColumnName());
-
-        // Merge all the location index tables into a single table
-        final Table mergedDataIndexes = PartitionedTableFactory.of(locationDataIndexes).merge();
-
-        // Group the merged data indexes by the keys
-        final Table groupedByKeyColumns = mergedDataIndexes.groupBy(keyColumnNamesArray);
-
-        // Combine the row sets from each group into a single row set
-        final Table combined = groupedByKeyColumns
-                .update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>(
+        private RowsetCacher(final ColumnSource<ObjectVector<RowSet>> source, final int capacity) {
+            this.source = source;
+            this.results = new AtomicReferenceArray<>(capacity);
+        }
+
+        RowSet get(final long rowKey) {
+            if (rowKey < 0 || rowKey >= results.length()) {
+                return null;
+            }
+
+            final int iRowKey = (int) rowKey;
+            do {
+                final Object localResult = results.get(iRowKey);
+                if (localResult instanceof RowSet) {
+                    return (RowSet) localResult;
+                }
+                if (localResult instanceof Exception) {
+                    throw new UncheckedDeephavenException("Exception found for cached RowSet",
+                            (Exception) localResult);
+                }
+
+                if (localResult != null) {
+                    // noinspection EmptySynchronizedStatement
+                    synchronized (localResult) {
+                        // don't care to do anything, we are just waiting for the barrier to be done
+                    }
+                    continue;
+                }
+
+                // we need to create our own placeholder, and synchronize on it first
+                final ReentrantLock placeholder = new ReentrantLock();
+                // noinspection SynchronizationOnLocalVariableOrMethodParameter
+                synchronized (placeholder) {
+                    if (!results.compareAndSet(iRowKey, null, placeholder)) {
+                        // we must try again, someone else has claimed the placeholder
+                        continue;
+                    }
+                    // it is our responsibility to get the right answer
+                    final ObjectVector<RowSet> inputRowsets = source.get(rowKey);
+                    Assert.neqNull(inputRowsets, "inputRowsets");
+                    assert inputRowsets != null;
+
+                    // need to get the value and set it into our own value
+                    final RowSet computedResult;
+                    try {
+                        computedResult = mergeRowSets(rowKey, inputRowsets);
+                    } catch (Exception e) {
+                        if (!results.compareAndSet(iRowKey, placeholder, e)) {
+                            throw new IllegalStateException("another thread changed our cache placeholder!");
+                        }
+                        throw e;
+                    }
+                    if (!results.compareAndSet(iRowKey, placeholder, computedResult)) {
+                        throw new IllegalStateException("another thread changed our cache placeholder!");
+                    }
+
+                    return computedResult;
+                }
+            } while (true);
+        }
+    }
+
+    private Table buildTable(final boolean lazyRowsetMerge) {
+        if (lazyTable != null) {
+            if (lazyRowsetMerge) {
+                return lazyTable;
+            } else {
+                indexTable = lazyTable.select();
+                lazyTable = null;
+                return indexTable;
+            }
+        }
+
+        final long t0 = System.nanoTime();
+        try {
+            final Table locationTable = columnSourceManager.locationTable().coalesce();
+
+            // Perform a parallelizable update to produce coalesced location index tables with their row sets shifted by
+            // the appropriate region offset. The row sets are not forced into memory; but keys are to enable efficient
+            // grouping. The rowsets are read into memory as part of the {@link #mergeRowSets} call.
+            final String[] keyColumnNamesArray = keyColumnNames.toArray(String[]::new);
+            final Table locationDataIndexes = locationTable
+                    .update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>(
+                            columnSourceManager.locationColumnName(), TableLocation.class,
+                            LOCATION_DATA_INDEX_TABLE_COLUMN_NAME, Table.class,
+                            (final long locationRowKey, final TableLocation location) -> loadIndexTableAndShiftRowSets(
+                                    locationRowKey, location, keyColumnNamesArray)))))
+                    .dropColumns(columnSourceManager.locationColumnName());
+
+            // Merge all the location index tables into a single table
+            final Table mergedDataIndexes = PartitionedTableFactory.of(locationDataIndexes).merge();
+
+            // Group the merged data indexes by the keys
+            final Table groupedByKeyColumns = mergedDataIndexes.groupBy(keyColumnNamesArray);
+
+            final Table combined;
+            if (lazyRowsetMerge) {
+                final ColumnSource<ObjectVector<RowSet>> vectorColumnSource =
+                        groupedByKeyColumns.getColumnSource(ROW_SET_COLUMN_NAME);
+
+                // we are using a regular array here; so we must ensure that we are flat; or we'll have a bad time
+                Assert.assertion(groupedByKeyColumns.isFlat(), "groupedByKeyColumns.isFlat()");
+                final RowsetCacher rowsetCacher = new RowsetCacher(vectorColumnSource, groupedByKeyColumns.intSize());
+                // need to do something better with a magic holder that looks at the rowset column source and lazily
+                // merges them instead of this version that actually wants to have an input and doesn't cache anything
+                combined = groupedByKeyColumns
+                        .view(List.of(SelectColumn.ofStateless(new MultiSourceFunctionalColumn<>(List.of(),
+                                ROW_SET_COLUMN_NAME, RowSet.class, (k, v) -> rowsetCacher.get(k)))));
+            } else {
+                // Combine the row sets from each group into a single row set
+                final List<SelectColumn> mergeFunction = List.of(SelectColumn.ofStateless(new FunctionalColumn<>(
                         ROW_SET_COLUMN_NAME, ObjectVector.class,
                         ROW_SET_COLUMN_NAME, RowSet.class,
-                        this::mergeRowSets))));
-        Assert.assertion(combined.isFlat(), "combined.isFlat()");
-        Assert.eq(groupedByKeyColumns.size(), "groupedByKeyColumns.size()", combined.size(), "combined.size()");
+                        MergedDataIndex::mergeRowSets)));
 
-        // Cleanup after ourselves
-        try (final CloseableIterator<RowSet> rowSets = mergedDataIndexes.objectColumnIterator(ROW_SET_COLUMN_NAME)) {
-            rowSets.forEachRemaining(SafeCloseable::close);
-        }
+                combined = groupedByKeyColumns.update(mergeFunction);
+            }
+            Assert.assertion(combined.isFlat(), "combined.isFlat()");
+            Assert.eq(groupedByKeyColumns.size(), "groupedByKeyColumns.size()", combined.size(), "combined.size()");
 
-        lookupFunction = AggregationProcessor.getRowLookup(groupedByKeyColumns);
-        indexTable = combined;
-        return combined;
+            lookupFunction = AggregationProcessor.getRowLookup(groupedByKeyColumns);
+
+            if (lazyRowsetMerge) {
+                lazyTable = combined;
+            } else {
+                indexTable = combined;
+            }
+
+            return combined;
+        } finally {
+            final long t1 = System.nanoTime();
+            buildIndexTable.sample(t1 - t0);
+        }
     }
 
     private static Table loadIndexTableAndShiftRowSets(
@@ -184,27 +331,59 @@ private static Table loadIndexTableAndShiftRowSets(
                     String.join(", ", keyColumnNames), location));
         }
         final Table indexTable = dataIndex.table();
-        return indexTable.coalesce().update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>(
-                dataIndex.rowSetColumnName(), RowSet.class,
-                ROW_SET_COLUMN_NAME, RowSet.class,
-                (final RowSet rowSet) -> rowSet
-                        .shift(RegionedColumnSource.getFirstRowKey(Math.toIntExact(locationRowKey)))))));
+        final long shiftAmount = RegionedColumnSource.getFirstRowKey(Math.toIntExact(locationRowKey));
+        final Table coalesced = indexTable.coalesce();
+
+        // pull the key columns into memory while we are parallel;
+        final Table withInMemorykeyColumns = coalesced.update(keyColumnNames);
+
+        final Selectable shiftFunction;
+        if (shiftAmount == 0) {
+            shiftFunction =
+                    Selectable.of(ColumnName.of(ROW_SET_COLUMN_NAME), ColumnName.of(dataIndex.rowSetColumnName()));
+        } else {
+            shiftFunction = new FunctionalColumn<>(
+                    dataIndex.rowSetColumnName(), RowSet.class,
+                    ROW_SET_COLUMN_NAME, RowSet.class,
+                    (final RowSet rowSet) -> rowSet.shift(shiftAmount));
+        }
+        // the rowset column shift need not occur until we perform the rowset merge operation - which is either
+        // lazy or part of an update [which itself can be parallel].
+        return withInMemorykeyColumns.updateView(List.of(SelectColumn.ofStateless(shiftFunction)));
     }
 
-    private RowSet mergeRowSets(
+    private static RowSet mergeRowSets(
             @SuppressWarnings("unused") final long unusedRowKey,
             @NotNull final ObjectVector<RowSet> keyRowSets) {
+        final long numRowSets = keyRowSets.size();
+
+        if (numRowSets == 1) {
+            // we steal the reference, the input is never used again
+            return keyRowSets.get(0);
+        }
         final RowSetBuilderSequential builder = RowSetFactory.builderSequential();
-        try (final CloseableIterator<RowSet> rowSets = keyRowSets.iterator()) {
-            rowSets.forEachRemaining(builder::appendRowSequence);
+
+        if (USE_PARALLEL_LAZY_FETCH) {
+            LongStream.range(0, numRowSets).parallel().mapToObj(keyRowSets::get)
+                    .sorted(Comparator.comparingLong(RowSet::firstRowKey)).forEachOrdered(rs -> {
+                        builder.appendRowSequence(rs);
+                        rs.close();
+                    });
+        } else {
+            try (final CloseableIterator<RowSet> rowSets = keyRowSets.iterator()) {
+                rowSets.forEachRemaining(rs -> {
+                    builder.appendRowSequence(rs);
+                    rs.close();
+                });
+            }
         }
         return builder.build();
     }
 
     @Override
     @NotNull
-    public RowKeyLookup rowKeyLookup() {
-        table();
+    public RowKeyLookup rowKeyLookup(final DataIndexOptions options) {
+        table(options);
         return (final Object key, final boolean usePrev) -> {
             // Pass the object to the aggregation lookup, then return the resulting row position (which is also the row
             // key).
@@ -232,13 +411,8 @@ public boolean isValid() {
         final String[] keyColumnNamesArray = keyColumnNames.toArray(String[]::new);
         try (final CloseableIterator<TableLocation> locations =
                 columnSourceManager.locationTable().objectColumnIterator(columnSourceManager.locationColumnName())) {
-            while (locations.hasNext()) {
-                if (!locations.next().hasDataIndex(keyColumnNamesArray)) {
-                    return isValid = false;
-                }
-            }
+            return isValid = locations.stream().parallel().allMatch(l -> l.hasDataIndex(keyColumnNamesArray));
         }
-        return isValid = true;
     }
 
     @Override
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java
index 4e7a9bb1a8d..1953470475e 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java
@@ -8,6 +8,7 @@
 import io.deephaven.base.verify.Require;
 import io.deephaven.engine.rowset.*;
 import io.deephaven.engine.table.ColumnSource;
+import io.deephaven.engine.table.DataIndexOptions;
 import io.deephaven.engine.table.ModifiedColumnSet;
 import io.deephaven.engine.table.Table;
 import io.deephaven.engine.table.TableUpdate;
@@ -300,13 +301,13 @@ public Map<ColumnSource<?>, String> keyColumnNamesByIndexedColumn() {
 
     @Override
     @NotNull
-    public Table table() {
+    public Table table(final DataIndexOptions unused) {
         return indexTable;
     }
 
     @Override
     @NotNull
-    public RowKeyLookup rowKeyLookup() {
+    public RowKeyLookup rowKeyLookup(final DataIndexOptions unusedOptions) {
         return (final Object key, final boolean usePrev) -> keyPositionMap.get(key);
     }
 
diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java
index 0aeb1b05a07..8a2d6817681 100644
--- a/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java
+++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java
@@ -482,7 +482,7 @@ private DataIndexImpl(@NotNull final Table table) {
         }
 
         @Override
-        public @NotNull Table table() {
+        public @NotNull Table table(DataIndexOptions ignored) {
             return table;
         }