From a15b9984cf1fb3f44b9b1ec3b7bfcef28ab020de Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Fri, 3 Jan 2025 14:19:20 -0500 Subject: [PATCH 01/24] DH-18300: Improve DataIndex performance. DataIndex, particularly when used for where() filters had missing parallelization opportunities; and would read more data than strictly necessary to satisfy the filter. Statistics have been added to various operations, the existing Value class was not thread safe. The internal state has been updated to use volatiles and AtomicLongFieldUpdaters. The following Configuration properties have been added: - AbstractColumnSource.usePartialDataIndex - AbstractColumnSource.useParallelIndexBuild - QueryTable.useDataIndexForAggregation - MergedDataIndex.useParallelLazyFetch --- .../java/io/deephaven/base/stats/Value.java | 47 ++- CONTRIBUTING.md | 5 + .../engine/table/BasicDataIndex.java | 32 +- .../io/deephaven/engine/table/DataIndex.java | 7 +- .../engine/table/DataIndexOptions.java | 65 ++++ .../table/impl/AbstractColumnSource.java | 208 +++++++++---- .../engine/table/impl/QueryTable.java | 23 +- .../impl/dataindex/RemappedDataIndex.java | 9 +- .../impl/dataindex/StandaloneDataIndex.java | 3 +- .../impl/dataindex/TableBackedDataIndex.java | 7 +- .../impl/dataindex/TransformedDataIndex.java | 8 +- .../select/analyzers/SelectColumnLayer.java | 5 +- .../sources/regioned/MergedDataIndex.java | 288 ++++++++++++++---- .../regioned/PartitioningColumnDataIndex.java | 5 +- .../TestRegionedColumnSourceManager.java | 2 +- 15 files changed, 566 insertions(+), 148 deletions(-) create mode 100644 engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java diff --git a/Base/src/main/java/io/deephaven/base/stats/Value.java b/Base/src/main/java/io/deephaven/base/stats/Value.java index cb8c098b9b4..65ee41424a1 100644 --- a/Base/src/main/java/io/deephaven/base/stats/Value.java +++ b/Base/src/main/java/io/deephaven/base/stats/Value.java @@ -3,14 +3,22 @@ // package io.deephaven.base.stats; -public abstract class Value { +import java.text.DecimalFormat; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; - protected long n = 0; - protected long last = 0; - protected long sum = 0; - protected long sum2 = 0; - protected long max = Long.MIN_VALUE; - protected long min = Long.MAX_VALUE; +public abstract class Value { + private static final AtomicLongFieldUpdater N_UPDATER = AtomicLongFieldUpdater.newUpdater(Value.class, "n"); + private static final AtomicLongFieldUpdater SUM_UPDATER = + AtomicLongFieldUpdater.newUpdater(Value.class, "sum"); + private static final AtomicLongFieldUpdater SUM2_UPDATER = + AtomicLongFieldUpdater.newUpdater(Value.class, "sum2"); + + volatile protected long n = 0; + volatile protected long last = 0; + volatile protected long sum = 0; + volatile protected long sum2 = 0; + volatile protected long max = Long.MIN_VALUE; + volatile protected long min = Long.MAX_VALUE; private boolean alwaysUpdated = false; @@ -52,10 +60,10 @@ protected Value(History history) { } public void sample(long x) { - n++; + N_UPDATER.incrementAndGet(this); + SUM_UPDATER.addAndGet(this, x); + SUM2_UPDATER.addAndGet(this, x * x); last = x; - sum += x; - sum2 += x * x; if (x > max) { max = x; } @@ -99,4 +107,23 @@ public void update(Item item, ItemUpdateListener listener, long logInterval, lon } } } + + @Override + public String toString() { + final DecimalFormat format = new DecimalFormat("#,###"); + final DecimalFormat avgFormat = new DecimalFormat("#,###.###"); + + final double variance = n > 1 ? (sum2 - ((double) sum * sum / (double) n)) / (n - 1) : Double.NaN; + + return "Value{" + + "n=" + format.format(n) + + (n > 0 ? ", sum=" + format.format(sum) + + ", max=" + format.format(max) + + ", min=" + format.format(min) + + ", avg=" + avgFormat.format((n > 0 ? (double) sum / n : Double.NaN)) + + ", std=" + avgFormat.format(Math.sqrt(variance)) + : "") + + + '}'; + } } diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 39aea6d0072..b1f17cddfcf 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -54,6 +54,11 @@ For more information, see: * [gh pr create](https://cli.github.com/manual/gh_pr_create) * [CLI In Use](https://cli.github.com/manual/examples.html) +## Labels + +- Each pull request msut have a `ReleaseNotesNeeded` label if a user can perceive the changes. Build or testing-only changes should have a `NoReleaseNotesNeeded` label. +- Each pull request must have a `DocumentationNeeded` label if the user guide should be updated. Pull requests that do not require a user guide update should have a `NoDocumentationNeeded` label. + ## Styleguide The [styleguide](style/README.md) is applied globally to the entire project, except for generated code that gets checked in. To apply the styleguide, run `./gradlew spotlessApply`. diff --git a/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java b/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java index 13ff41aa000..f1f09f096be 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java @@ -96,16 +96,43 @@ default ColumnSource[] keyColumns(@NotNull final ColumnSource[] indexedCol @FinalDefault @NotNull default ColumnSource rowSetColumn() { - return table().getColumnSource(rowSetColumnName(), RowSet.class); + return rowSetColumn(DataIndexOptions.DEFAULT); + } + + /** + * Get the {@link RowSet} {@link ColumnSource} of the index {@link #table() table}. + * + * @return The {@link RowSet} {@link ColumnSource} + */ + @FinalDefault + @NotNull + default ColumnSource rowSetColumn(final DataIndexOptions options) { + return table(options).getColumnSource(rowSetColumnName(), RowSet.class); } /** * Get the {@link Table} backing this data index. + * + *

+ * The returned table is fully in-memory, equivalent to {@link #table(DataIndexOptions)} with default options. + *

* * @return The {@link Table} */ @NotNull - Table table(); + default Table table() { + return table(DataIndexOptions.DEFAULT); + } + + /** + * Get the {@link Table} backing this data index. + * + * @param options parameters to control the returned table + * + * @return The {@link Table} + */ + @NotNull + Table table(DataIndexOptions options); /** * Whether the index {@link #table()} {@link Table#isRefreshing() is refreshing}. Some transformations will force @@ -115,4 +142,5 @@ default ColumnSource rowSetColumn() { * otherwise */ boolean isRefreshing(); + } diff --git a/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java b/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java index b257bdbc8d0..3dbdc8eeca7 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java @@ -48,7 +48,12 @@ interface RowKeyLookup { * @return A function that provides map-like lookup of index {@link #table()} row keys from an index lookup key */ @NotNull - RowKeyLookup rowKeyLookup(); + default RowKeyLookup rowKeyLookup() { + return rowKeyLookup(DataIndexOptions.DEFAULT); + } + + @NotNull + RowKeyLookup rowKeyLookup(DataIndexOptions options); /** * Return a {@link RowKeyLookup lookup function} function of index row keys for this index. If diff --git a/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java b/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java new file mode 100644 index 00000000000..becd0948b2c --- /dev/null +++ b/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java @@ -0,0 +1,65 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.engine.table; + +import io.deephaven.annotations.BuildableStyle; +import io.deephaven.api.filter.Filter; +import org.immutables.value.Value; + +/** + * Options for controlling the function of a {@link DataIndex}. + * + *

+ * Presently, this is used for the {@link Table#where(Filter)} operation to more efficiently handle data index matches, + * without necessarily reading all RowSet information from disk across partitions. + *

+ */ +@Value.Immutable +@BuildableStyle +public interface DataIndexOptions { + DataIndexOptions DEFAULT = DataIndexOptions.builder().build(); + + /** + * Does this operation use only a subset of the DataIndex? + * + *

+ * The DataIndex implementation may use this hint to defer work for some row sets. + *

+ * + * @return if this operation is only going to use a subset of this data index + */ + @Value.Default + default boolean operationUsesPartialTable() { + return false; + } + + /** + * Create a new builder for a {@link DataIndexOptions}. + * + * @return + */ + static Builder builder() { + return ImmutableDataIndexOptions.builder(); + } + + /** + * The builder interface to construct a {@link DataIndexOptions}. + */ + interface Builder { + /** + * Set whether this operation only uses a subset of the data index. + * + * @param usesPartialTable true if this operation only uses a partial table + * @return this builder + */ + Builder operationUsesPartialTable(boolean usesPartialTable); + + /** + * Build the {@link DataIndexOptions}. + * + * @return an immutable DataIndexOptions structure. + */ + DataIndexOptions build(); + } +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java index 65d434d6598..eae1bf0cb5b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java @@ -3,6 +3,9 @@ // package io.deephaven.engine.table.impl; +import io.deephaven.base.stats.Counter; +import io.deephaven.base.stats.Stats; +import io.deephaven.base.stats.Value; import io.deephaven.base.string.cache.CharSequenceUtils; import io.deephaven.base.verify.Assert; import io.deephaven.chunk.Chunk; @@ -10,12 +13,14 @@ import io.deephaven.chunk.ObjectChunk; import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.attributes.Values; +import io.deephaven.configuration.Configuration; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.primitive.iterator.CloseableIterator; import io.deephaven.engine.rowset.*; import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.DataIndex; +import io.deephaven.engine.table.DataIndexOptions; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.impl.chunkfillers.ChunkFiller; import io.deephaven.engine.table.impl.chunkfilter.ChunkFilter; @@ -34,12 +39,48 @@ import java.time.Instant; import java.time.ZonedDateTime; +import java.util.Arrays; import java.util.Collections; public abstract class AbstractColumnSource implements ColumnSource, DefaultChunkSource.WithPrev { + /** + * For a {@link #match(boolean, boolean, boolean, DataIndex, RowSet, Object...)} call that uses a DataIndex, by + * default we do not force the entire DataIndex to be loaded into memory. This is because many + * {@link io.deephaven.engine.table.impl.select.MatchFilter}s are highly selective and only need to instantiate a + * single RowSet value rather than the complete DataIndex for the entire table. When the Configuration property + * "AbstractColumnSource.usePartialDataIndex" is set to false, the query engine materializes the entire DataIndex + * table for the match call. + */ + public static boolean USE_PARTIAL_TABLE_DATA_INDEX = Configuration.getInstance() + .getBooleanWithDefault("AbstractColumnSource.usePartialDataIndex", true); + /** + * After generating a DataIndex table and identifying which row keys are responsive to the filter, the result Index + * can be built in serial or in parallel. By default, the index is built in parallel which may take advantage of + * using more threads for I/O of the index data structure. Parallel builds do require more setup and thread + * synchronization, so they can be disabled by setting the Configuration property + * "AbstractColumnSource.useParallelIndexBuild" to false. + */ + public static boolean USE_PARALLEL_INDEX_BUILD = Configuration.getInstance() + .getBooleanWithDefault("AbstractColumnSource.useParallelIndexBuild", true); + + /** + * Duration of match() calls using a DataIndex (also provides the count). + */ + public static final Value indexFilter = + Stats.makeItem("AbstractColumnSource", "indexFilter", Counter.FACTORY, + "Duration of match() with a DataIndex in nanos") + .getValue(); + /** + * Duration of match() calls using a chunk filter (i.e. no DataIndex). + */ + public static final Value chunkFilter = + Stats.makeItem("AbstractColumnSource", "chunkFilter", Counter.FACTORY, + "Duration of match() without a DataIndex in nanos") + .getValue(); + /** * Minimum average run length in an {@link RowSequence} that should trigger {@link Chunk}-filling by key ranges * instead of individual keys. @@ -48,6 +89,13 @@ public abstract class AbstractColumnSource implements private static final int CHUNK_SIZE = 1 << 11; + /** + * The match operation does not use the complete table, it just picks out the relevant indices so there is no reason + * to actually read it fully. + */ + private static final DataIndexOptions PARTIAL_TABLE_DATA_INDEX = + DataIndexOptions.builder().operationUsesPartialTable(true).build(); + protected final Class type; protected final Class componentType; @@ -115,82 +163,134 @@ public WritableRowSet match( final boolean usePrev, final boolean caseInsensitive, @Nullable final DataIndex dataIndex, - @NotNull final RowSet mapper, + @NotNull final RowSet rowsetToFilter, final Object... keys) { + if (dataIndex == null) { + return doChunkFilter(invertMatch, usePrev, caseInsensitive, rowsetToFilter, keys); + } + final long t0 = System.nanoTime(); + try { + return doDataIndexFilter(invertMatch, usePrev, caseInsensitive, dataIndex, rowsetToFilter, keys); + } finally { + final long t1 = System.nanoTime(); + indexFilter.sample(t1 - t0); + } + } - if (dataIndex != null) { - final Table indexTable = dataIndex.table(); - final RowSet matchingIndexRows; - if (caseInsensitive && type == String.class) { - // Linear scan through the index table, accumulating index row keys for case-insensitive matches - final RowSetBuilderSequential matchingIndexRowsBuilder = RowSetFactory.builderSequential(); - - // noinspection rawtypes - final KeyedObjectHashSet keySet = new KeyedObjectHashSet<>(new CIStringKey()); - // noinspection unchecked - Collections.addAll(keySet, keys); - - final RowSet indexRowSet = usePrev ? indexTable.getRowSet().prev() : indexTable.getRowSet(); - final ColumnSource indexKeySource = - indexTable.getColumnSource(dataIndex.keyColumnNames().get(0), String.class); - - final int chunkSize = (int) Math.min(CHUNK_SIZE, indexRowSet.size()); - try (final RowSequence.Iterator indexRowSetIterator = indexRowSet.getRowSequenceIterator(); - final GetContext indexKeyGetContext = indexKeySource.makeGetContext(chunkSize)) { - while (indexRowSetIterator.hasMore()) { - final RowSequence chunkIndexRows = indexRowSetIterator.getNextRowSequenceWithLength(chunkSize); - final ObjectChunk chunkKeys = (usePrev - ? indexKeySource.getPrevChunk(indexKeyGetContext, chunkIndexRows) - : indexKeySource.getChunk(indexKeyGetContext, chunkIndexRows)).asObjectChunk(); - final LongChunk chunkRowKeys = chunkIndexRows.asRowKeyChunk(); - final int thisChunkSize = chunkKeys.size(); - for (int ii = 0; ii < thisChunkSize; ++ii) { - final String key = chunkKeys.get(ii); - if (keySet.containsKey(key)) { - matchingIndexRowsBuilder.appendKey(chunkRowKeys.get(ii)); - } + private WritableRowSet doDataIndexFilter(final boolean invertMatch, + final boolean usePrev, + final boolean caseInsensitive, + @NotNull final DataIndex dataIndex, + @NotNull final RowSet rowsetToFilter, + final Object[] keys) { + final DataIndexOptions partialOption = + USE_PARTIAL_TABLE_DATA_INDEX ? PARTIAL_TABLE_DATA_INDEX : DataIndexOptions.DEFAULT; + + final Table indexTable = dataIndex.table(partialOption); + + final RowSet matchingIndexRows; + if (caseInsensitive && type == String.class) { + // Linear scan through the index table, accumulating index row keys for case-insensitive matches + final RowSetBuilderSequential matchingIndexRowsBuilder = RowSetFactory.builderSequential(); + + // noinspection rawtypes + final KeyedObjectHashSet keySet = new KeyedObjectHashSet<>(new CIStringKey()); + // noinspection unchecked + Collections.addAll(keySet, keys); + + final RowSet indexRowSet = usePrev ? indexTable.getRowSet().prev() : indexTable.getRowSet(); + final ColumnSource indexKeySource = + indexTable.getColumnSource(dataIndex.keyColumnNames().get(0), String.class); + + final int chunkSize = (int) Math.min(CHUNK_SIZE, indexRowSet.size()); + try (final RowSequence.Iterator indexRowSetIterator = indexRowSet.getRowSequenceIterator(); + final GetContext indexKeyGetContext = indexKeySource.makeGetContext(chunkSize)) { + while (indexRowSetIterator.hasMore()) { + final RowSequence chunkIndexRows = indexRowSetIterator.getNextRowSequenceWithLength(chunkSize); + final ObjectChunk chunkKeys = (usePrev + ? indexKeySource.getPrevChunk(indexKeyGetContext, chunkIndexRows) + : indexKeySource.getChunk(indexKeyGetContext, chunkIndexRows)).asObjectChunk(); + final LongChunk chunkRowKeys = chunkIndexRows.asRowKeyChunk(); + final int thisChunkSize = chunkKeys.size(); + for (int ii = 0; ii < thisChunkSize; ++ii) { + final String key = chunkKeys.get(ii); + if (keySet.containsKey(key)) { + matchingIndexRowsBuilder.appendKey(chunkRowKeys.get(ii)); } } } - matchingIndexRows = matchingIndexRowsBuilder.build(); - } else { - // Use the lookup function to get the index row keys for the matching keys - final RowSetBuilderRandom matchingIndexRowsBuilder = RowSetFactory.builderRandom(); - - final DataIndex.RowKeyLookup rowKeyLookup = dataIndex.rowKeyLookup(); - for (Object key : keys) { - final long rowKey = rowKeyLookup.apply(key, usePrev); - if (rowKey != RowSequence.NULL_ROW_KEY) { - matchingIndexRowsBuilder.addKey(rowKey); - } + } + matchingIndexRows = matchingIndexRowsBuilder.build(); + } else { + // Use the lookup function to get the index row keys for the matching keys + final RowSetBuilderRandom matchingIndexRowsBuilder = RowSetFactory.builderRandom(); + + final DataIndex.RowKeyLookup rowKeyLookup = dataIndex.rowKeyLookup(partialOption); + for (final Object key : keys) { + final long rowKey = rowKeyLookup.apply(key, usePrev); + if (rowKey != RowSequence.NULL_ROW_KEY) { + matchingIndexRowsBuilder.addKey(rowKey); } - matchingIndexRows = matchingIndexRowsBuilder.build(); } + matchingIndexRows = matchingIndexRowsBuilder.build(); + } - try (final SafeCloseable ignored = matchingIndexRows) { - final WritableRowSet filtered = invertMatch ? mapper.copy() : RowSetFactory.empty(); - if (matchingIndexRows.isNonempty()) { - final ColumnSource indexRowSetSource = usePrev - ? dataIndex.rowSetColumn().getPrevSource() - : dataIndex.rowSetColumn(); + try (final SafeCloseable ignored = matchingIndexRows) { + final WritableRowSet filtered = invertMatch ? rowsetToFilter.copy() : RowSetFactory.empty(); + if (matchingIndexRows.isNonempty()) { + final ColumnSource indexRowSetSource = usePrev + ? dataIndex.rowSetColumn(partialOption).getPrevSource() + : dataIndex.rowSetColumn(partialOption); + + if (USE_PARALLEL_INDEX_BUILD) { + final long[] rowKeyArray = new long[matchingIndexRows.intSize()]; + matchingIndexRows.toRowKeyArray(rowKeyArray); + Arrays.stream(rowKeyArray).parallel().forEach((final long rowKey) -> { + final RowSet matchingRowSet = indexRowSetSource.get(rowKey); + assert matchingRowSet != null; + if (invertMatch) { + synchronized (filtered) { + filtered.remove(matchingRowSet); + } + } else { + try (final RowSet intersected = matchingRowSet.intersect(rowsetToFilter)) { + synchronized (filtered) { + filtered.insert(intersected); + } + } + } + }); + } else { try (final CloseableIterator matchingIndexRowSetIterator = ChunkedColumnIterator.make(indexRowSetSource, matchingIndexRows)) { matchingIndexRowSetIterator.forEachRemaining((final RowSet matchingRowSet) -> { if (invertMatch) { filtered.remove(matchingRowSet); } else { - try (final RowSet intersected = matchingRowSet.intersect(mapper)) { + try (final RowSet intersected = matchingRowSet.intersect(rowsetToFilter)) { filtered.insert(intersected); } } }); } } - return filtered; } - } else { - return ChunkFilter.applyChunkFilter(mapper, this, usePrev, + return filtered; + } + } + + private WritableRowSet doChunkFilter(final boolean invertMatch, + final boolean usePrev, + final boolean caseInsensitive, + @NotNull final RowSet rowsetToFilter, + final Object[] keys) { + final long t0 = System.nanoTime(); + try { + return ChunkFilter.applyChunkFilter(rowsetToFilter, this, usePrev, ChunkMatchFilterFactory.getChunkFilter(type, caseInsensitive, invertMatch, keys)); + } finally { + final long t1 = System.nanoTime(); + chunkFilter.sample(t1 - t0); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index a431891f94d..b5cec945450 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -198,12 +198,20 @@ public interface MemoizableOperation ChunkedOperatorAggregationHelper.aggregation( + () -> ChunkedOperatorAggregationHelper.aggregation(aggregationControl, aggregationContextFactory, this, preserveEmpty, initialGroups, groupByColumns)); } } @@ -1237,13 +1247,14 @@ private void initializeAndPrioritizeFilters(@NotNull final WhereFilter... filter final int numFilters = filters.length; final BitSet priorityFilterIndexes = new BitSet(numFilters); - final QueryCompilerRequestProcessor.BatchProcessor compilationProcesor = QueryCompilerRequestProcessor.batch(); + final QueryCompilerRequestProcessor.BatchProcessor compilationProcessor = QueryCompilerRequestProcessor.batch(); + // Initialize our filters immediately so we can examine the columns they use. Note that filter // initialization is safe to invoke repeatedly. for (final WhereFilter filter : filters) { - filter.init(getDefinition(), compilationProcesor); + filter.init(getDefinition(), compilationProcessor); } - compilationProcesor.compile(); + compilationProcessor.compile(); for (int fi = 0; fi < numFilters; ++fi) { final WhereFilter filter = filters[fi]; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/RemappedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/RemappedDataIndex.java index 95311871f96..b7f960eb5f8 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/RemappedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/RemappedDataIndex.java @@ -6,6 +6,7 @@ import io.deephaven.base.verify.Assert; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.DataIndex; +import io.deephaven.engine.table.DataIndexOptions; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.impl.indexer.DataIndexer; import org.jetbrains.annotations.NotNull; @@ -91,14 +92,14 @@ public List keyColumnNames() { @Override @NotNull - public Table table() { - return sourceIndex.table(); + public Table table(final DataIndexOptions options) { + return sourceIndex.table(options); } @Override @NotNull - public RowKeyLookup rowKeyLookup() { - return sourceIndex.rowKeyLookup(); + public RowKeyLookup rowKeyLookup(final DataIndexOptions options) { + return sourceIndex.rowKeyLookup(options); } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/StandaloneDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/StandaloneDataIndex.java index b496c96171b..cf4eafbf9c7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/StandaloneDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/StandaloneDataIndex.java @@ -6,6 +6,7 @@ import io.deephaven.engine.liveness.LivenessArtifact; import io.deephaven.engine.table.BasicDataIndex; import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.DataIndexOptions; import io.deephaven.engine.table.Table; import org.jetbrains.annotations.NotNull; @@ -63,7 +64,7 @@ public String rowSetColumnName() { @Override @NotNull - public Table table() { + public Table table(final DataIndexOptions unused) { return table; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TableBackedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TableBackedDataIndex.java index 4802143a9a1..d2a9d8659a1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TableBackedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TableBackedDataIndex.java @@ -9,6 +9,7 @@ import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.DataIndexOptions; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.table.impl.by.*; @@ -83,7 +84,7 @@ public Map, String> keyColumnNamesByIndexedColumn() { @Override @NotNull - public Table table() { + public Table table(final DataIndexOptions unused) { Table localIndexTable; if ((localIndexTable = indexTable) != null) { return localIndexTable; @@ -136,8 +137,8 @@ private Table computeTable() { @Override @NotNull - public RowKeyLookup rowKeyLookup() { - table(); + public RowKeyLookup rowKeyLookup(final DataIndexOptions options) { + table(options); return (final Object key, final boolean usePrev) -> { // Pass the object to the aggregation lookup, then return the resulting row key. This index will be // correct in prev or current space because of the aggregation's hash-based lookup. diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TransformedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TransformedDataIndex.java index 7759adcaa49..c9f923566b4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TransformedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TransformedDataIndex.java @@ -10,11 +10,7 @@ import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.WritableRowSet; -import io.deephaven.engine.table.BasicDataIndex; -import io.deephaven.engine.table.ColumnSource; -import io.deephaven.engine.table.DataIndex; -import io.deephaven.engine.table.DataIndexTransformer; -import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.ForkJoinPoolOperationInitializer; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; import io.deephaven.engine.table.impl.select.FunctionalColumn; @@ -72,7 +68,7 @@ public String rowSetColumnName() { @Override @NotNull - public Table table() { + public Table table(final DataIndexOptions unused) { Table localIndexTable; if ((localIndexTable = indexTable) != null) { return localIndexTable; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java index 431d00ee7e7..356bd05f8b7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/analyzers/SelectColumnLayer.java @@ -191,7 +191,10 @@ public Runnable createUpdateHandler( // If we have shifts, that makes everything nasty; so we do not want to deal with it final boolean hasShifts = upstream.shifted().nonempty(); - final boolean serialTableOperationsSafe = updateGraph.serialTableOperationsSafe() + // liveResultOwner is only null when we are static; in the static case there is no need to + // worry about serial table operation checking + final boolean serialTableOperationsSafe = liveResultOwner == null + || updateGraph.serialTableOperationsSafe() || updateGraph.sharedLock().isHeldByCurrentThread() || updateGraph.exclusiveLock().isHeldByCurrentThread(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java index 881e8a21e88..a19923566c8 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java @@ -4,17 +4,20 @@ package io.deephaven.engine.table.impl.sources.regioned; import io.deephaven.UncheckedDeephavenException; +import io.deephaven.api.ColumnName; +import io.deephaven.api.Selectable; +import io.deephaven.base.stats.Counter; +import io.deephaven.base.stats.Stats; +import io.deephaven.base.stats.Value; import io.deephaven.base.verify.Assert; import io.deephaven.base.verify.Require; +import io.deephaven.configuration.Configuration; import io.deephaven.engine.primitive.iterator.CloseableIterator; import io.deephaven.engine.rowset.RowSequence; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetBuilderSequential; import io.deephaven.engine.rowset.RowSetFactory; -import io.deephaven.engine.table.BasicDataIndex; -import io.deephaven.engine.table.ColumnSource; -import io.deephaven.engine.table.PartitionedTableFactory; -import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.ForkJoinPoolOperationInitializer; import io.deephaven.engine.table.impl.by.AggregationProcessor; import io.deephaven.engine.table.impl.by.AggregationRowLookup; @@ -23,14 +26,17 @@ import io.deephaven.engine.table.impl.locations.TableLocation; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; import io.deephaven.engine.table.impl.select.FunctionalColumn; +import io.deephaven.engine.table.impl.select.MultiSourceFunctionalColumn; import io.deephaven.engine.table.impl.select.SelectColumn; -import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.InternalUseOnly; import io.deephaven.vector.ObjectVector; import org.jetbrains.annotations.NotNull; import java.util.*; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.IntStream; +import java.util.stream.LongStream; /** * DataIndex that accumulates the individual per-{@link TableLocation} data indexes of a {@link Table} backed by a @@ -45,6 +51,21 @@ */ @InternalUseOnly class MergedDataIndex extends AbstractDataIndex implements DataIndexer.RetainableDataIndex { + /** + * The duration in nanos to build a DataIndex table. + */ + public static final Value buildIndexTable = Stats + .makeItem("DataIndex", "buildTable", Counter.FACTORY, "Duration in nanos of building an index").getValue(); + + /** + * When merging row sets from multiple component DataIndex structures, reading each individual component can be + * performed in parallel to improve the wall-clock I/O time observed. For cases where the number of individual + * groups to merge is small and I/O bound this can be very beneficial. Setting up the parallelism, however, can add + * additional overhead. Setting the Configuration property "MergedDataIndex.useParallelLazyFetch" to false disables + * this behavior and each key's merged RowSet is computed serially. + */ + public static boolean USE_PARALLEL_LAZY_FETCH = Configuration.getInstance() + .getBooleanWithDefault("MergedDataIndex.useParallelLazyFetch", true); private static final String LOCATION_DATA_INDEX_TABLE_COLUMN_NAME = "__DataIndexTable"; @@ -63,6 +84,12 @@ class MergedDataIndex extends AbstractDataIndex implements DataIndexer.Retainabl */ private volatile Table indexTable; + /** + * A lazy version of the table, note this value is never set if indexTable is set. A lazy table can be converted to + * an indexTable by selecting the rowset column. + */ + private volatile Table lazyTable; + /** * Whether this index is known to be corrupt. */ @@ -113,19 +140,29 @@ public Map, String> keyColumnNamesByIndexedColumn() { @Override @NotNull - public Table table() { + public Table table(final DataIndexOptions options) { Table localIndexTable; if ((localIndexTable = indexTable) != null) { return localIndexTable; } + final boolean lazyRowsetMerge = options.operationUsesPartialTable(); + if (lazyRowsetMerge) { + if ((localIndexTable = lazyTable) != null) { + return localIndexTable; + } + } synchronized (this) { if ((localIndexTable = indexTable) != null) { return localIndexTable; + } else if (lazyRowsetMerge) { + if ((localIndexTable = lazyTable) != null) { + return localIndexTable; + } } try { return QueryPerformanceRecorder.withNugget( String.format("Merge Data Indexes [%s]", String.join(", ", keyColumnNames)), - ForkJoinPoolOperationInitializer.ensureParallelizable(this::buildTable)); + ForkJoinPoolOperationInitializer.ensureParallelizable(() -> buildTable(lazyRowsetMerge))); } catch (Throwable t) { isCorrupt = true; throw t; @@ -133,45 +170,155 @@ public Table table() { } } - private Table buildTable() { - final Table locationTable = columnSourceManager.locationTable().coalesce(); + /** + * The RowSetCacher is a bit of a hack that allows us to avoid reading actual rowsets from disk until they are + * actually required for a query operation. We are breaking engine rules in that we reference the source + * ColumnSource directly and do not have correct dependencies encoded in a select. MergedDataIndexes are only + * permitted for a static table, so we can get away with this. + * + *

+ * Once a RowSet has been written, we write down the result into an AtomicReferenceArray so that it need not be read + * repeatedly. If two threads attempt to concurrently fetch the same rowset, then we use a placeholder object in the + * array to avoid duplicated work. + *

+ */ + private static class RowsetCacher { + final ColumnSource> source; + final AtomicReferenceArray results; - // Perform a parallelizable update to produce coalesced location index tables with their row sets shifted by - // the appropriate region offset. - // This potentially loads many small row sets into memory, but it avoids the risk of re-materializing row set - // pages during the accumulation phase. - final String[] keyColumnNamesArray = keyColumnNames.toArray(String[]::new); - final Table locationDataIndexes = locationTable - .update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>( - columnSourceManager.locationColumnName(), TableLocation.class, - LOCATION_DATA_INDEX_TABLE_COLUMN_NAME, Table.class, - (final long locationRowKey, final TableLocation location) -> loadIndexTableAndShiftRowSets( - locationRowKey, location, keyColumnNamesArray))))) - .dropColumns(columnSourceManager.locationColumnName()); - - // Merge all the location index tables into a single table - final Table mergedDataIndexes = PartitionedTableFactory.of(locationDataIndexes).merge(); - - // Group the merged data indexes by the keys - final Table groupedByKeyColumns = mergedDataIndexes.groupBy(keyColumnNamesArray); - - // Combine the row sets from each group into a single row set - final Table combined = groupedByKeyColumns - .update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>( + private RowsetCacher(final ColumnSource> source, final int capacity) { + this.source = source; + this.results = new AtomicReferenceArray<>(capacity); + } + + RowSet get(final long rowKey) { + if (rowKey < 0 || rowKey >= results.length()) { + return null; + } + + final int iRowKey = (int) rowKey; + do { + final Object localResult = results.get(iRowKey); + if (localResult instanceof RowSet) { + return (RowSet) localResult; + } + if (localResult instanceof Exception) { + throw new UncheckedDeephavenException("Exception found for cached RowSet", + (Exception) localResult); + } + + if (localResult != null) { + // noinspection EmptySynchronizedStatement + synchronized (localResult) { + // don't care to do anything, we are just waiting for the barrier to be done + } + continue; + } + + // we need to create our own placeholder, and synchronize on it first + final ReentrantLock placeholder = new ReentrantLock(); + // noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (placeholder) { + if (!results.compareAndSet(iRowKey, null, placeholder)) { + // we must try again, someone else has claimed the placeholder + continue; + } + // it is our responsibility to get the right answer + final ObjectVector inputRowsets = source.get(rowKey); + Assert.neqNull(inputRowsets, "inputRowsets"); + assert inputRowsets != null; + + // need to get the value and set it into our own value + final RowSet computedResult; + try { + computedResult = mergeRowSets(rowKey, inputRowsets); + } catch (Exception e) { + if (!results.compareAndSet(iRowKey, placeholder, e)) { + throw new IllegalStateException("another thread changed our cache placeholder!"); + } + throw e; + } + if (!results.compareAndSet(iRowKey, placeholder, computedResult)) { + throw new IllegalStateException("another thread changed our cache placeholder!"); + } + + return computedResult; + } + } while (true); + } + } + + private Table buildTable(final boolean lazyRowsetMerge) { + if (lazyTable != null) { + if (lazyRowsetMerge) { + return lazyTable; + } else { + indexTable = lazyTable.select(); + lazyTable = null; + return indexTable; + } + } + + final long t0 = System.nanoTime(); + try { + final Table locationTable = columnSourceManager.locationTable().coalesce(); + + // Perform a parallelizable update to produce coalesced location index tables with their row sets shifted by + // the appropriate region offset. The row sets are not forced into memory; but keys are to enable efficient + // grouping. The rowsets are read into memory as part of the {@link #mergeRowSets} call. + final String[] keyColumnNamesArray = keyColumnNames.toArray(String[]::new); + final Table locationDataIndexes = locationTable + .update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>( + columnSourceManager.locationColumnName(), TableLocation.class, + LOCATION_DATA_INDEX_TABLE_COLUMN_NAME, Table.class, + (final long locationRowKey, final TableLocation location) -> loadIndexTableAndShiftRowSets( + locationRowKey, location, keyColumnNamesArray))))) + .dropColumns(columnSourceManager.locationColumnName()); + + // Merge all the location index tables into a single table + final Table mergedDataIndexes = PartitionedTableFactory.of(locationDataIndexes).merge(); + + // Group the merged data indexes by the keys + final Table groupedByKeyColumns = mergedDataIndexes.groupBy(keyColumnNamesArray); + + final Table combined; + if (lazyRowsetMerge) { + final ColumnSource> vectorColumnSource = + groupedByKeyColumns.getColumnSource(ROW_SET_COLUMN_NAME); + + // we are using a regular array here; so we must ensure that we are flat; or we'll have a bad time + Assert.assertion(groupedByKeyColumns.isFlat(), "groupedByKeyColumns.isFlat()"); + final RowsetCacher rowsetCacher = new RowsetCacher(vectorColumnSource, groupedByKeyColumns.intSize()); + // need to do something better with a magic holder that looks at the rowset column source and lazily + // merges them instead of this version that actually wants to have an input and doesn't cache anything + combined = groupedByKeyColumns + .view(List.of(SelectColumn.ofStateless(new MultiSourceFunctionalColumn<>(List.of(), + ROW_SET_COLUMN_NAME, RowSet.class, (k, v) -> rowsetCacher.get(k))))); + } else { + // Combine the row sets from each group into a single row set + final List mergeFunction = List.of(SelectColumn.ofStateless(new FunctionalColumn<>( ROW_SET_COLUMN_NAME, ObjectVector.class, ROW_SET_COLUMN_NAME, RowSet.class, - this::mergeRowSets)))); - Assert.assertion(combined.isFlat(), "combined.isFlat()"); - Assert.eq(groupedByKeyColumns.size(), "groupedByKeyColumns.size()", combined.size(), "combined.size()"); + MergedDataIndex::mergeRowSets))); - // Cleanup after ourselves - try (final CloseableIterator rowSets = mergedDataIndexes.objectColumnIterator(ROW_SET_COLUMN_NAME)) { - rowSets.forEachRemaining(SafeCloseable::close); - } + combined = groupedByKeyColumns.update(mergeFunction); + } + Assert.assertion(combined.isFlat(), "combined.isFlat()"); + Assert.eq(groupedByKeyColumns.size(), "groupedByKeyColumns.size()", combined.size(), "combined.size()"); - lookupFunction = AggregationProcessor.getRowLookup(groupedByKeyColumns); - indexTable = combined; - return combined; + lookupFunction = AggregationProcessor.getRowLookup(groupedByKeyColumns); + + if (lazyRowsetMerge) { + lazyTable = combined; + } else { + indexTable = combined; + } + + return combined; + } finally { + final long t1 = System.nanoTime(); + buildIndexTable.sample(t1 - t0); + } } private static Table loadIndexTableAndShiftRowSets( @@ -184,27 +331,59 @@ private static Table loadIndexTableAndShiftRowSets( String.join(", ", keyColumnNames), location)); } final Table indexTable = dataIndex.table(); - return indexTable.coalesce().update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>( - dataIndex.rowSetColumnName(), RowSet.class, - ROW_SET_COLUMN_NAME, RowSet.class, - (final RowSet rowSet) -> rowSet - .shift(RegionedColumnSource.getFirstRowKey(Math.toIntExact(locationRowKey))))))); + final long shiftAmount = RegionedColumnSource.getFirstRowKey(Math.toIntExact(locationRowKey)); + final Table coalesced = indexTable.coalesce(); + + // pull the key columns into memory while we are parallel; + final Table withInMemorykeyColumns = coalesced.update(keyColumnNames); + + final Selectable shiftFunction; + if (shiftAmount == 0) { + shiftFunction = + Selectable.of(ColumnName.of(ROW_SET_COLUMN_NAME), ColumnName.of(dataIndex.rowSetColumnName())); + } else { + shiftFunction = new FunctionalColumn<>( + dataIndex.rowSetColumnName(), RowSet.class, + ROW_SET_COLUMN_NAME, RowSet.class, + (final RowSet rowSet) -> rowSet.shift(shiftAmount)); + } + // the rowset column shift need not occur until we perform the rowset merge operation - which is either + // lazy or part of an update [which itself can be parallel]. + return withInMemorykeyColumns.updateView(List.of(SelectColumn.ofStateless(shiftFunction))); } - private RowSet mergeRowSets( + private static RowSet mergeRowSets( @SuppressWarnings("unused") final long unusedRowKey, @NotNull final ObjectVector keyRowSets) { + final long numRowSets = keyRowSets.size(); + + if (numRowSets == 1) { + // we steal the reference, the input is never used again + return keyRowSets.get(0); + } final RowSetBuilderSequential builder = RowSetFactory.builderSequential(); - try (final CloseableIterator rowSets = keyRowSets.iterator()) { - rowSets.forEachRemaining(builder::appendRowSequence); + + if (USE_PARALLEL_LAZY_FETCH) { + LongStream.range(0, numRowSets).parallel().mapToObj(keyRowSets::get) + .sorted(Comparator.comparingLong(RowSet::firstRowKey)).forEachOrdered(rs -> { + builder.appendRowSequence(rs); + rs.close(); + }); + } else { + try (final CloseableIterator rowSets = keyRowSets.iterator()) { + rowSets.forEachRemaining(rs -> { + builder.appendRowSequence(rs); + rs.close(); + }); + } } return builder.build(); } @Override @NotNull - public RowKeyLookup rowKeyLookup() { - table(); + public RowKeyLookup rowKeyLookup(final DataIndexOptions options) { + table(options); return (final Object key, final boolean usePrev) -> { // Pass the object to the aggregation lookup, then return the resulting row position (which is also the row // key). @@ -232,13 +411,8 @@ public boolean isValid() { final String[] keyColumnNamesArray = keyColumnNames.toArray(String[]::new); try (final CloseableIterator locations = columnSourceManager.locationTable().objectColumnIterator(columnSourceManager.locationColumnName())) { - while (locations.hasNext()) { - if (!locations.next().hasDataIndex(keyColumnNamesArray)) { - return isValid = false; - } - } + return isValid = locations.stream().parallel().allMatch(l -> l.hasDataIndex(keyColumnNamesArray)); } - return isValid = true; } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java index 4e7a9bb1a8d..1953470475e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/PartitioningColumnDataIndex.java @@ -8,6 +8,7 @@ import io.deephaven.base.verify.Require; import io.deephaven.engine.rowset.*; import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.DataIndexOptions; import io.deephaven.engine.table.ModifiedColumnSet; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableUpdate; @@ -300,13 +301,13 @@ public Map, String> keyColumnNamesByIndexedColumn() { @Override @NotNull - public Table table() { + public Table table(final DataIndexOptions unused) { return indexTable; } @Override @NotNull - public RowKeyLookup rowKeyLookup() { + public RowKeyLookup rowKeyLookup(final DataIndexOptions unusedOptions) { return (final Object key, final boolean usePrev) -> keyPositionMap.get(key); } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java index 0aeb1b05a07..8a2d6817681 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java @@ -482,7 +482,7 @@ private DataIndexImpl(@NotNull final Table table) { } @Override - public @NotNull Table table() { + public @NotNull Table table(DataIndexOptions ignored) { return table; } From ef721a7fe8027b68d1dc561106771343d30f3d96 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 22 Jan 2025 07:33:34 -0500 Subject: [PATCH 02/24] simple changes --- .../java/io/deephaven/base/AtomicUtil.java | 41 ++++++++++++++ .../java/io/deephaven/base/stats/Value.java | 45 ++++++++------- CONTRIBUTING.md | 2 +- .../engine/table/BasicDataIndex.java | 2 + .../io/deephaven/engine/table/DataIndex.java | 9 +++ .../engine/table/DataIndexOptions.java | 17 ++++-- .../table/impl/AbstractColumnSource.java | 29 ++++------ .../sources/regioned/MergedDataIndex.java | 56 ++++++++----------- 8 files changed, 125 insertions(+), 76 deletions(-) diff --git a/Base/src/main/java/io/deephaven/base/AtomicUtil.java b/Base/src/main/java/io/deephaven/base/AtomicUtil.java index 43aa96ac37b..866d11f5ee6 100644 --- a/Base/src/main/java/io/deephaven/base/AtomicUtil.java +++ b/Base/src/main/java/io/deephaven/base/AtomicUtil.java @@ -5,6 +5,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; public abstract class AtomicUtil { @@ -267,4 +268,44 @@ public static int atomicAndNot(AtomicInteger i, int mask) { } while (!i.compareAndSet(expect, update)); return update; } + + /** + * Sets the field to the minimum of the current value and the passed in value + * + * @param o the object to update + * @param fu the field updater + * @param value the value that is a candidate for the minumum + * @return true if the minimum was set + * @param the type of o + */ + public static boolean setMin(final T o, final AtomicLongFieldUpdater fu, final long value) { + long current = fu.get(o); + while (current > value) { + if (fu.compareAndSet(o, current, value)) { + return true; + } + current = fu.get(o); + } + return false; + } + + /** + * Sets the field to the maximum of the current value and the passed in value + * + * @param o the object to update + * @param fu the field updater + * @param value the value that is a candidate for the maximum + * @return true if the maximum was set + * @param the type of o + */ + public static boolean setMax(final T o, final AtomicLongFieldUpdater fu, final long value) { + long current = fu.get(o); + while (value > current) { + if (fu.compareAndSet(o, current, value)) { + return true; + } + current = fu.get(o); + } + return false; + } } diff --git a/Base/src/main/java/io/deephaven/base/stats/Value.java b/Base/src/main/java/io/deephaven/base/stats/Value.java index 65ee41424a1..92f9bfb2266 100644 --- a/Base/src/main/java/io/deephaven/base/stats/Value.java +++ b/Base/src/main/java/io/deephaven/base/stats/Value.java @@ -3,6 +3,8 @@ // package io.deephaven.base.stats; +import io.deephaven.base.AtomicUtil; + import java.text.DecimalFormat; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -12,13 +14,17 @@ public abstract class Value { AtomicLongFieldUpdater.newUpdater(Value.class, "sum"); private static final AtomicLongFieldUpdater SUM2_UPDATER = AtomicLongFieldUpdater.newUpdater(Value.class, "sum2"); - - volatile protected long n = 0; - volatile protected long last = 0; - volatile protected long sum = 0; - volatile protected long sum2 = 0; - volatile protected long max = Long.MIN_VALUE; - volatile protected long min = Long.MAX_VALUE; + private static final AtomicLongFieldUpdater MAX_UPDATER = + AtomicLongFieldUpdater.newUpdater(Value.class, "max"); + private static final AtomicLongFieldUpdater MIN_UPDATER = + AtomicLongFieldUpdater.newUpdater(Value.class, "min"); + + protected volatile long n = 0; + protected volatile long last = 0; + protected volatile long sum = 0; + protected volatile long sum2 = 0; + protected volatile long max = Long.MIN_VALUE; + protected volatile long min = Long.MAX_VALUE; private boolean alwaysUpdated = false; @@ -65,10 +71,10 @@ public void sample(long x) { SUM2_UPDATER.addAndGet(this, x * x); last = x; if (x > max) { - max = x; + AtomicUtil.setMax(this, MAX_UPDATER, x); } if (x < min) { - min = x; + AtomicUtil.setMin(this, MAX_UPDATER, x); } } @@ -113,17 +119,14 @@ public String toString() { final DecimalFormat format = new DecimalFormat("#,###"); final DecimalFormat avgFormat = new DecimalFormat("#,###.###"); - final double variance = n > 1 ? (sum2 - ((double) sum * sum / (double) n)) / (n - 1) : Double.NaN; - - return "Value{" + - "n=" + format.format(n) + - (n > 0 ? ", sum=" + format.format(sum) + - ", max=" + format.format(max) + - ", min=" + format.format(min) + - ", avg=" + avgFormat.format((n > 0 ? (double) sum / n : Double.NaN)) + - ", std=" + avgFormat.format(Math.sqrt(variance)) - : "") - + - '}'; + + if (n > 0) { + final double std = Math.sqrt(n > 1 ? (sum2 - ((double) sum * sum / (double) n)) / (n - 1) : Double.NaN); + final double avg = (double) sum / n; + return String.format("Value{n=%,d, sum=%,d, max=%,d, min=%,d, avg=%,.3f, std=%,.3f}", n, sum, max, min, avg, + std); + } else { + return String.format("Value{n=%,d}", n); + } } } diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index b1f17cddfcf..e82274c805a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -56,7 +56,7 @@ For more information, see: ## Labels -- Each pull request msut have a `ReleaseNotesNeeded` label if a user can perceive the changes. Build or testing-only changes should have a `NoReleaseNotesNeeded` label. +- Each pull request must have a `ReleaseNotesNeeded` label if a user can perceive the changes. Build or testing-only changes should have a `NoReleaseNotesNeeded` label. - Each pull request must have a `DocumentationNeeded` label if the user guide should be updated. Pull requests that do not require a user guide update should have a `NoDocumentationNeeded` label. ## Styleguide diff --git a/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java b/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java index f1f09f096be..08453259398 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java @@ -102,6 +102,8 @@ default ColumnSource rowSetColumn() { /** * Get the {@link RowSet} {@link ColumnSource} of the index {@link #table() table}. * + * @param options required for building the Index table this ColumnSource is retrieved from + * * @return The {@link RowSet} {@link ColumnSource} */ @FinalDefault diff --git a/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java b/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java index 3dbdc8eeca7..8bea446650c 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java @@ -52,6 +52,15 @@ default RowKeyLookup rowKeyLookup() { return rowKeyLookup(DataIndexOptions.DEFAULT); } + /** + * Build a {@link RowKeyLookup lookup function} of row keys for this index. If {@link #isRefreshing()} is + * {@code true}, this lookup function is only guaranteed to be accurate for the current cycle. Lookup keys should be + * in the order of the index's key columns. + * + * @param options required for building the table, if required by this RowKeyLookup + * + * @return A function that provides map-like lookup of index {@link #table()} row keys from an index lookup key + */ @NotNull RowKeyLookup rowKeyLookup(DataIndexOptions options); diff --git a/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java b/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java index becd0948b2c..3b786c0bdeb 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java @@ -10,16 +10,20 @@ /** * Options for controlling the function of a {@link DataIndex}. * - *

- * Presently, this is used for the {@link Table#where(Filter)} operation to more efficiently handle data index matches, - * without necessarily reading all RowSet information from disk across partitions. - *

*/ @Value.Immutable @BuildableStyle public interface DataIndexOptions { + /** + * Static default options, which uses a full table. + */ DataIndexOptions DEFAULT = DataIndexOptions.builder().build(); + /** + * Static options that uses a partial table instead of the full table. + */ + DataIndexOptions USE_PARTIAL_TABLE = DataIndexOptions.builder().operationUsesPartialTable(true).build(); + /** * Does this operation use only a subset of the DataIndex? * @@ -27,6 +31,11 @@ public interface DataIndexOptions { * The DataIndex implementation may use this hint to defer work for some row sets. *

* + *

+ * Presently, this is used for the {@link Table#where(Filter)} operation to hint that work for computing + * {@link io.deephaven.engine.rowset.RowSet RowSets} for non-matching keys should be deferred. + *

+ * * @return if this operation is only going to use a subset of this data index */ @Value.Default diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java index eae1bf0cb5b..73660d2ed82 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java @@ -57,28 +57,28 @@ public abstract class AbstractColumnSource implements public static boolean USE_PARTIAL_TABLE_DATA_INDEX = Configuration.getInstance() .getBooleanWithDefault("AbstractColumnSource.usePartialDataIndex", true); /** - * After generating a DataIndex table and identifying which row keys are responsive to the filter, the result Index + * After generating a DataIndex table and identifying which row keys are responsive to the filter, the result RowSet * can be built in serial or in parallel. By default, the index is built in parallel which may take advantage of * using more threads for I/O of the index data structure. Parallel builds do require more setup and thread * synchronization, so they can be disabled by setting the Configuration property * "AbstractColumnSource.useParallelIndexBuild" to false. */ - public static boolean USE_PARALLEL_INDEX_BUILD = Configuration.getInstance() - .getBooleanWithDefault("AbstractColumnSource.useParallelIndexBuild", true); + public static boolean USE_PARALLEL_ROWSET_BUILD = Configuration.getInstance() + .getBooleanWithDefault("AbstractColumnSource.useParallelRowSetBuild", true); /** * Duration of match() calls using a DataIndex (also provides the count). */ - public static final Value indexFilter = + public static final Value INDEX_FILTER_MILLIS = Stats.makeItem("AbstractColumnSource", "indexFilter", Counter.FACTORY, - "Duration of match() with a DataIndex in nanos") + "Duration of match() with a DataIndex in millis") .getValue(); /** * Duration of match() calls using a chunk filter (i.e. no DataIndex). */ - public static final Value chunkFilter = + public static final Value CHUNK_FILTER_MILLIS = Stats.makeItem("AbstractColumnSource", "chunkFilter", Counter.FACTORY, - "Duration of match() without a DataIndex in nanos") + "Duration of match() without a DataIndex in millis") .getValue(); /** @@ -89,13 +89,6 @@ public abstract class AbstractColumnSource implements private static final int CHUNK_SIZE = 1 << 11; - /** - * The match operation does not use the complete table, it just picks out the relevant indices so there is no reason - * to actually read it fully. - */ - private static final DataIndexOptions PARTIAL_TABLE_DATA_INDEX = - DataIndexOptions.builder().operationUsesPartialTable(true).build(); - protected final Class type; protected final Class componentType; @@ -173,7 +166,7 @@ public WritableRowSet match( return doDataIndexFilter(invertMatch, usePrev, caseInsensitive, dataIndex, rowsetToFilter, keys); } finally { final long t1 = System.nanoTime(); - indexFilter.sample(t1 - t0); + INDEX_FILTER_MILLIS.sample((t1 - t0) / 1_000_000); } } @@ -184,7 +177,7 @@ private WritableRowSet doDataIndexFilter(final boolean invertMatch, @NotNull final RowSet rowsetToFilter, final Object[] keys) { final DataIndexOptions partialOption = - USE_PARTIAL_TABLE_DATA_INDEX ? PARTIAL_TABLE_DATA_INDEX : DataIndexOptions.DEFAULT; + USE_PARTIAL_TABLE_DATA_INDEX ? DataIndexOptions.USE_PARTIAL_TABLE : DataIndexOptions.DEFAULT; final Table indexTable = dataIndex.table(partialOption); @@ -242,7 +235,7 @@ private WritableRowSet doDataIndexFilter(final boolean invertMatch, ? dataIndex.rowSetColumn(partialOption).getPrevSource() : dataIndex.rowSetColumn(partialOption); - if (USE_PARALLEL_INDEX_BUILD) { + if (USE_PARALLEL_ROWSET_BUILD) { final long[] rowKeyArray = new long[matchingIndexRows.intSize()]; matchingIndexRows.toRowKeyArray(rowKeyArray); Arrays.stream(rowKeyArray).parallel().forEach((final long rowKey) -> { @@ -290,7 +283,7 @@ private WritableRowSet doChunkFilter(final boolean invertMatch, ChunkMatchFilterFactory.getChunkFilter(type, caseInsensitive, invertMatch, keys)); } finally { final long t1 = System.nanoTime(); - chunkFilter.sample(t1 - t0); + CHUNK_FILTER_MILLIS.sample((t1 - t0) / 1_000_000); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java index a19923566c8..c974fc5f6d5 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java @@ -34,7 +34,6 @@ import java.util.*; import java.util.concurrent.atomic.AtomicReferenceArray; -import java.util.concurrent.locks.ReentrantLock; import java.util.stream.IntStream; import java.util.stream.LongStream; @@ -54,8 +53,8 @@ class MergedDataIndex extends AbstractDataIndex implements DataIndexer.Retainabl /** * The duration in nanos to build a DataIndex table. */ - public static final Value buildIndexTable = Stats - .makeItem("DataIndex", "buildTable", Counter.FACTORY, "Duration in nanos of building an index").getValue(); + public static final Value BUILD_INDEX_TABLE_MILLIS = Stats + .makeItem("DataIndex", "buildTable", Counter.FACTORY, "Duration in millis of building an index").getValue(); /** * When merging row sets from multiple component DataIndex structures, reading each individual component can be @@ -85,8 +84,8 @@ class MergedDataIndex extends AbstractDataIndex implements DataIndexer.Retainabl private volatile Table indexTable; /** - * A lazy version of the table, note this value is never set if indexTable is set. A lazy table can be converted to - * an indexTable by selecting the rowset column. + * A lazy version of the table. This value is never set if indexTable is set. Can be converted to indexTable by + * selecting the RowSet column. */ private volatile Table lazyTable; @@ -145,8 +144,8 @@ public Table table(final DataIndexOptions options) { if ((localIndexTable = indexTable) != null) { return localIndexTable; } - final boolean lazyRowsetMerge = options.operationUsesPartialTable(); - if (lazyRowsetMerge) { + final boolean lazyRowSetMerge = options.operationUsesPartialTable(); + if (lazyRowSetMerge) { if ((localIndexTable = lazyTable) != null) { return localIndexTable; } @@ -154,7 +153,7 @@ public Table table(final DataIndexOptions options) { synchronized (this) { if ((localIndexTable = indexTable) != null) { return localIndexTable; - } else if (lazyRowsetMerge) { + } else if (lazyRowSetMerge) { if ((localIndexTable = lazyTable) != null) { return localIndexTable; } @@ -162,7 +161,7 @@ public Table table(final DataIndexOptions options) { try { return QueryPerformanceRecorder.withNugget( String.format("Merge Data Indexes [%s]", String.join(", ", keyColumnNames)), - ForkJoinPoolOperationInitializer.ensureParallelizable(() -> buildTable(lazyRowsetMerge))); + ForkJoinPoolOperationInitializer.ensureParallelizable(() -> buildTable(lazyRowSetMerge))); } catch (Throwable t) { isCorrupt = true; throw t; @@ -182,11 +181,11 @@ public Table table(final DataIndexOptions options) { * array to avoid duplicated work. *

*/ - private static class RowsetCacher { + private static class RowSetCacher { final ColumnSource> source; final AtomicReferenceArray results; - private RowsetCacher(final ColumnSource> source, final int capacity) { + private RowSetCacher(final ColumnSource> source, final int capacity) { this.source = source; this.results = new AtomicReferenceArray<>(capacity); } @@ -216,7 +215,7 @@ RowSet get(final long rowKey) { } // we need to create our own placeholder, and synchronize on it first - final ReentrantLock placeholder = new ReentrantLock(); + final Object placeholder = new Object(); // noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (placeholder) { if (!results.compareAndSet(iRowKey, null, placeholder)) { @@ -226,22 +225,17 @@ RowSet get(final long rowKey) { // it is our responsibility to get the right answer final ObjectVector inputRowsets = source.get(rowKey); Assert.neqNull(inputRowsets, "inputRowsets"); - assert inputRowsets != null; // need to get the value and set it into our own value final RowSet computedResult; try { + // noinspection DataFlowIssue computedResult = mergeRowSets(rowKey, inputRowsets); } catch (Exception e) { - if (!results.compareAndSet(iRowKey, placeholder, e)) { - throw new IllegalStateException("another thread changed our cache placeholder!"); - } + results.set(iRowKey, e); throw e; } - if (!results.compareAndSet(iRowKey, placeholder, computedResult)) { - throw new IllegalStateException("another thread changed our cache placeholder!"); - } - + results.set(iRowKey, computedResult); return computedResult; } } while (true); @@ -252,11 +246,10 @@ private Table buildTable(final boolean lazyRowsetMerge) { if (lazyTable != null) { if (lazyRowsetMerge) { return lazyTable; - } else { - indexTable = lazyTable.select(); - lazyTable = null; - return indexTable; } + indexTable = lazyTable.select(); + lazyTable = null; + return indexTable; } final long t0 = System.nanoTime(); @@ -264,8 +257,9 @@ private Table buildTable(final boolean lazyRowsetMerge) { final Table locationTable = columnSourceManager.locationTable().coalesce(); // Perform a parallelizable update to produce coalesced location index tables with their row sets shifted by - // the appropriate region offset. The row sets are not forced into memory; but keys are to enable efficient - // grouping. The rowsets are read into memory as part of the {@link #mergeRowSets} call. + // the appropriate region offset. The row sets are not forced into memory, but keys are in order to enable + // efficient + // grouping. The rowsets are read into memory as part of the mergeRowSets call. final String[] keyColumnNamesArray = keyColumnNames.toArray(String[]::new); final Table locationDataIndexes = locationTable .update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>( @@ -288,9 +282,7 @@ private Table buildTable(final boolean lazyRowsetMerge) { // we are using a regular array here; so we must ensure that we are flat; or we'll have a bad time Assert.assertion(groupedByKeyColumns.isFlat(), "groupedByKeyColumns.isFlat()"); - final RowsetCacher rowsetCacher = new RowsetCacher(vectorColumnSource, groupedByKeyColumns.intSize()); - // need to do something better with a magic holder that looks at the rowset column source and lazily - // merges them instead of this version that actually wants to have an input and doesn't cache anything + final RowSetCacher rowsetCacher = new RowSetCacher(vectorColumnSource, groupedByKeyColumns.intSize()); combined = groupedByKeyColumns .view(List.of(SelectColumn.ofStateless(new MultiSourceFunctionalColumn<>(List.of(), ROW_SET_COLUMN_NAME, RowSet.class, (k, v) -> rowsetCacher.get(k))))); @@ -317,7 +309,7 @@ private Table buildTable(final boolean lazyRowsetMerge) { return combined; } finally { final long t1 = System.nanoTime(); - buildIndexTable.sample(t1 - t0); + BUILD_INDEX_TABLE_MILLIS.sample((t1 - t0) / 1_000_000); } } @@ -335,7 +327,7 @@ private static Table loadIndexTableAndShiftRowSets( final Table coalesced = indexTable.coalesce(); // pull the key columns into memory while we are parallel; - final Table withInMemorykeyColumns = coalesced.update(keyColumnNames); + final Table withInMemoryKeyColumns = coalesced.update(keyColumnNames); final Selectable shiftFunction; if (shiftAmount == 0) { @@ -349,7 +341,7 @@ private static Table loadIndexTableAndShiftRowSets( } // the rowset column shift need not occur until we perform the rowset merge operation - which is either // lazy or part of an update [which itself can be parallel]. - return withInMemorykeyColumns.updateView(List.of(SelectColumn.ofStateless(shiftFunction))); + return withInMemoryKeyColumns.updateView(List.of(SelectColumn.ofStateless(shiftFunction))); } private static RowSet mergeRowSets( From 10759ddade07fd5b2561cd532df2d87df84fca98 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 22 Jan 2025 10:19:24 -0500 Subject: [PATCH 03/24] Fix minimum, add test. --- .../java/io/deephaven/base/stats/Value.java | 6 +--- .../io/deephaven/base/stats/TestValue.java | 32 +++++++++++++++++++ 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/Base/src/main/java/io/deephaven/base/stats/Value.java b/Base/src/main/java/io/deephaven/base/stats/Value.java index 92f9bfb2266..3a659bd48f9 100644 --- a/Base/src/main/java/io/deephaven/base/stats/Value.java +++ b/Base/src/main/java/io/deephaven/base/stats/Value.java @@ -74,7 +74,7 @@ public void sample(long x) { AtomicUtil.setMax(this, MAX_UPDATER, x); } if (x < min) { - AtomicUtil.setMin(this, MAX_UPDATER, x); + AtomicUtil.setMin(this, MIN_UPDATER, x); } } @@ -116,10 +116,6 @@ public void update(Item item, ItemUpdateListener listener, long logInterval, lon @Override public String toString() { - final DecimalFormat format = new DecimalFormat("#,###"); - final DecimalFormat avgFormat = new DecimalFormat("#,###.###"); - - if (n > 0) { final double std = Math.sqrt(n > 1 ? (sum2 - ((double) sum * sum / (double) n)) / (n - 1) : Double.NaN); final double avg = (double) sum / n; diff --git a/Base/src/test/java/io/deephaven/base/stats/TestValue.java b/Base/src/test/java/io/deephaven/base/stats/TestValue.java index 2b841c7ae3e..a629da56b68 100644 --- a/Base/src/test/java/io/deephaven/base/stats/TestValue.java +++ b/Base/src/test/java/io/deephaven/base/stats/TestValue.java @@ -5,6 +5,7 @@ import junit.framework.TestCase; +import java.util.concurrent.Semaphore; import java.util.function.LongFunction; // -------------------------------------------------------------------- @@ -49,6 +50,37 @@ public void testCounter() { checkValue(Counter.FACTORY); } + public void testToString() { + // we are testing toString, but also creating a pile of threads to exercise some of the AtomicFieldUpdater + // behavior of the value + final Counter counter = Counter.FACTORY.apply(0L); + + final String emptyString = counter.toString(); + assertEquals("Value{n=0}", emptyString); + + final Semaphore semaphore = new Semaphore(0); + final Semaphore completion = new Semaphore(0); + final int n = 100; + for (int ii = 0; ii < n; ++ii) { + final int fii = ii; + new Thread(() -> { + semaphore.acquireUninterruptibly(); + counter.sample(fii); + completion.release(); + }).start(); + } + semaphore.release(100); + completion.acquireUninterruptibly(100); + + assertEquals(n, counter.getN()); + assertEquals(((n - 1) * n) / 2, counter.getSum()); + assertEquals(0, counter.getMin()); + assertEquals(99, counter.getMax()); + + final String asString = counter.toString(); + assertEquals("Value{n=100, sum=4,950, max=99, min=0, avg=49.500, std=29.011}", asString); + } + // ---------------------------------------------------------------- private void checkValue(LongFunction factory) { Value value = factory.apply(1000L); From 34b77c121baa8346cd9f686c7aa86ee343b3f40d Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 22 Jan 2025 11:30:09 -0500 Subject: [PATCH 04/24] Split Counter into a thread-safe variant to avoid performance concerns in other parts of the code. --- .../base/stats/ThreadSafeCounter.java | 44 +++++++++++++++++++ .../deephaven/base/stats/ThreadSafeValue.java | 42 ++++++++++++++++++ .../java/io/deephaven/base/stats/Value.java | 23 +++------- .../io/deephaven/base/stats/TestValue.java | 9 +++- .../table/impl/AbstractColumnSource.java | 5 ++- .../sources/regioned/MergedDataIndex.java | 5 ++- 6 files changed, 107 insertions(+), 21 deletions(-) create mode 100644 Base/src/main/java/io/deephaven/base/stats/ThreadSafeCounter.java create mode 100644 Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java diff --git a/Base/src/main/java/io/deephaven/base/stats/ThreadSafeCounter.java b/Base/src/main/java/io/deephaven/base/stats/ThreadSafeCounter.java new file mode 100644 index 00000000000..dd30e0770e9 --- /dev/null +++ b/Base/src/main/java/io/deephaven/base/stats/ThreadSafeCounter.java @@ -0,0 +1,44 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.base.stats; + +import java.util.function.LongFunction; + +// -------------------------------------------------------------------- + +/** + * A statistic where each value represents a additive quantity, and thus the sum of the values does have meaning. + * Examples include event counts and processing duration. If the sum of the values does not have a useful + * interpretation, use {@link State} instead. + *
    + *
  • {@link #increment} updates the counter, recording a single value. This is the most common usage. ({@link #sample} + * does exactly the same thing but is a poor verb to use with a Counter.) + *
  • {@link #incrementFromSample} updates the counter, recording a value that is the difference between this sample + * and the last sample. (The first call just sets the "last" sample and does not record a value.) For example, this can + * be used to CPU usage rate when only a running total is available by periodically sampling the running total. + *
+ */ +public class ThreadSafeCounter extends ThreadSafeValue { + + public static final char TYPE_TAG = 'C'; + + public ThreadSafeCounter(long now) { + super(now); + } + + long previousSample = Long.MIN_VALUE; + + public void incrementFromSample(long n) { + if (Long.MIN_VALUE != previousSample) { + sample(n - previousSample); + } + previousSample = n; + } + + public char getTypeTag() { + return TYPE_TAG; + } + + public static final LongFunction FACTORY = ThreadSafeCounter::new; +} diff --git a/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java b/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java new file mode 100644 index 00000000000..8c473450875 --- /dev/null +++ b/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java @@ -0,0 +1,42 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.base.stats; + +import io.deephaven.base.AtomicUtil; + +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + +public abstract class ThreadSafeValue extends Value { + private static final AtomicLongFieldUpdater N_UPDATER = AtomicLongFieldUpdater.newUpdater(Value.class, "n"); + private static final AtomicLongFieldUpdater SUM_UPDATER = + AtomicLongFieldUpdater.newUpdater(Value.class, "sum"); + private static final AtomicLongFieldUpdater SUM2_UPDATER = + AtomicLongFieldUpdater.newUpdater(Value.class, "sum2"); + private static final AtomicLongFieldUpdater MAX_UPDATER = + AtomicLongFieldUpdater.newUpdater(Value.class, "max"); + private static final AtomicLongFieldUpdater MIN_UPDATER = + AtomicLongFieldUpdater.newUpdater(Value.class, "min"); + + public ThreadSafeValue(long now) { + super(now); + } + + protected ThreadSafeValue(History history) { + super(history); + } + + @Override + public void sample(final long x) { + N_UPDATER.incrementAndGet(this); + SUM_UPDATER.addAndGet(this, x); + SUM2_UPDATER.addAndGet(this, x * x); + last = x; + if (x > max) { + AtomicUtil.setMax(this, MAX_UPDATER, x); + } + if (x < min) { + AtomicUtil.setMin(this, MIN_UPDATER, x); + } + } +} diff --git a/Base/src/main/java/io/deephaven/base/stats/Value.java b/Base/src/main/java/io/deephaven/base/stats/Value.java index 3a659bd48f9..33468b4a985 100644 --- a/Base/src/main/java/io/deephaven/base/stats/Value.java +++ b/Base/src/main/java/io/deephaven/base/stats/Value.java @@ -9,16 +9,6 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; public abstract class Value { - private static final AtomicLongFieldUpdater N_UPDATER = AtomicLongFieldUpdater.newUpdater(Value.class, "n"); - private static final AtomicLongFieldUpdater SUM_UPDATER = - AtomicLongFieldUpdater.newUpdater(Value.class, "sum"); - private static final AtomicLongFieldUpdater SUM2_UPDATER = - AtomicLongFieldUpdater.newUpdater(Value.class, "sum2"); - private static final AtomicLongFieldUpdater MAX_UPDATER = - AtomicLongFieldUpdater.newUpdater(Value.class, "max"); - private static final AtomicLongFieldUpdater MIN_UPDATER = - AtomicLongFieldUpdater.newUpdater(Value.class, "min"); - protected volatile long n = 0; protected volatile long last = 0; protected volatile long sum = 0; @@ -65,16 +55,17 @@ protected Value(History history) { this.history = history; } - public void sample(long x) { - N_UPDATER.incrementAndGet(this); - SUM_UPDATER.addAndGet(this, x); - SUM2_UPDATER.addAndGet(this, x * x); + @SuppressWarnings("NonAtomicOperationOnVolatileField") + public void sample(final long x) { + n++; + sum += x; + sum2 += x * x; last = x; if (x > max) { - AtomicUtil.setMax(this, MAX_UPDATER, x); + max = x; } if (x < min) { - AtomicUtil.setMin(this, MIN_UPDATER, x); + min = x; } } diff --git a/Base/src/test/java/io/deephaven/base/stats/TestValue.java b/Base/src/test/java/io/deephaven/base/stats/TestValue.java index a629da56b68..a1ab7eff36d 100644 --- a/Base/src/test/java/io/deephaven/base/stats/TestValue.java +++ b/Base/src/test/java/io/deephaven/base/stats/TestValue.java @@ -51,9 +51,16 @@ public void testCounter() { } public void testToString() { + // this is purposefully a heisentest, this should make it fail if it is really broken + for (int ii = 0; ii < 10; ++ii) { + doTestToString(); + } + } + + public void doTestToString() { // we are testing toString, but also creating a pile of threads to exercise some of the AtomicFieldUpdater // behavior of the value - final Counter counter = Counter.FACTORY.apply(0L); + final Value counter = ThreadSafeCounter.FACTORY.apply(0L); final String emptyString = counter.toString(); assertEquals("Value{n=0}", emptyString); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java index 73660d2ed82..dbd3c3ba58d 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java @@ -5,6 +5,7 @@ import io.deephaven.base.stats.Counter; import io.deephaven.base.stats.Stats; +import io.deephaven.base.stats.ThreadSafeCounter; import io.deephaven.base.stats.Value; import io.deephaven.base.string.cache.CharSequenceUtils; import io.deephaven.base.verify.Assert; @@ -70,14 +71,14 @@ public abstract class AbstractColumnSource implements * Duration of match() calls using a DataIndex (also provides the count). */ public static final Value INDEX_FILTER_MILLIS = - Stats.makeItem("AbstractColumnSource", "indexFilter", Counter.FACTORY, + Stats.makeItem("AbstractColumnSource", "indexFilter", ThreadSafeCounter.FACTORY, "Duration of match() with a DataIndex in millis") .getValue(); /** * Duration of match() calls using a chunk filter (i.e. no DataIndex). */ public static final Value CHUNK_FILTER_MILLIS = - Stats.makeItem("AbstractColumnSource", "chunkFilter", Counter.FACTORY, + Stats.makeItem("AbstractColumnSource", "chunkFilter", ThreadSafeCounter.FACTORY, "Duration of match() without a DataIndex in millis") .getValue(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java index c974fc5f6d5..284eed4f20a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java @@ -6,8 +6,8 @@ import io.deephaven.UncheckedDeephavenException; import io.deephaven.api.ColumnName; import io.deephaven.api.Selectable; -import io.deephaven.base.stats.Counter; import io.deephaven.base.stats.Stats; +import io.deephaven.base.stats.ThreadSafeCounter; import io.deephaven.base.stats.Value; import io.deephaven.base.verify.Assert; import io.deephaven.base.verify.Require; @@ -54,7 +54,8 @@ class MergedDataIndex extends AbstractDataIndex implements DataIndexer.Retainabl * The duration in nanos to build a DataIndex table. */ public static final Value BUILD_INDEX_TABLE_MILLIS = Stats - .makeItem("DataIndex", "buildTable", Counter.FACTORY, "Duration in millis of building an index").getValue(); + .makeItem("DataIndex", "buildTable", ThreadSafeCounter.FACTORY, "Duration in millis of building an index") + .getValue(); /** * When merging row sets from multiple component DataIndex structures, reading each individual component can be From adbadfd81b3ef0308ca91a7abbbd6ea1f12c23d8 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 22 Jan 2025 11:33:19 -0500 Subject: [PATCH 05/24] Add javadoc, remove unused imports. --- .../java/io/deephaven/base/stats/ThreadSafeValue.java | 8 ++++++++ Base/src/main/java/io/deephaven/base/stats/Value.java | 9 ++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java b/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java index 8c473450875..4766db3ca33 100644 --- a/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java +++ b/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java @@ -7,6 +7,14 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; +/** + * A thread-safe extension of the {@link Value} class. + * + *

+ * The {@link #sample(long)} method uses atomic CAS operations, so may introduce contention compared to the unsafe Value + * version of sample. + *

+ */ public abstract class ThreadSafeValue extends Value { private static final AtomicLongFieldUpdater N_UPDATER = AtomicLongFieldUpdater.newUpdater(Value.class, "n"); private static final AtomicLongFieldUpdater SUM_UPDATER = diff --git a/Base/src/main/java/io/deephaven/base/stats/Value.java b/Base/src/main/java/io/deephaven/base/stats/Value.java index 33468b4a985..15c715403b1 100644 --- a/Base/src/main/java/io/deephaven/base/stats/Value.java +++ b/Base/src/main/java/io/deephaven/base/stats/Value.java @@ -3,12 +3,11 @@ // package io.deephaven.base.stats; -import io.deephaven.base.AtomicUtil; - -import java.text.DecimalFormat; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; - public abstract class Value { + /** + * These members are volatile, the sample(long) method is not thread safe; and you can get wrong answers out of it. + * If you require safety, you should instead use a ThreadSafeValue. + */ protected volatile long n = 0; protected volatile long last = 0; protected volatile long sum = 0; From 7a956c5f607a34f7425575819f399ec77c543b31 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 22 Jan 2025 12:02:02 -0500 Subject: [PATCH 06/24] update --- .../base/stats/ThreadSafeCounter.java | 24 ++++--------------- .../java/io/deephaven/base/stats/Value.java | 2 +- .../engine/table/DataIndexOptions.java | 2 +- .../table/impl/AbstractColumnSource.java | 11 ++++----- .../sources/regioned/MergedDataIndex.java | 5 ++-- 5 files changed, 15 insertions(+), 29 deletions(-) diff --git a/Base/src/main/java/io/deephaven/base/stats/ThreadSafeCounter.java b/Base/src/main/java/io/deephaven/base/stats/ThreadSafeCounter.java index dd30e0770e9..d66756187d3 100644 --- a/Base/src/main/java/io/deephaven/base/stats/ThreadSafeCounter.java +++ b/Base/src/main/java/io/deephaven/base/stats/ThreadSafeCounter.java @@ -8,36 +8,22 @@ // -------------------------------------------------------------------- /** - * A statistic where each value represents a additive quantity, and thus the sum of the values does have meaning. - * Examples include event counts and processing duration. If the sum of the values does not have a useful - * interpretation, use {@link State} instead. + * A statistic where each value represents an additive quantity, and thus the sum of the values does have + * meaning. Examples include event counts and processing duration. If the sum of the values does not have a + * useful interpretation, use {@link State} instead. *
    *
  • {@link #increment} updates the counter, recording a single value. This is the most common usage. ({@link #sample} * does exactly the same thing but is a poor verb to use with a Counter.) - *
  • {@link #incrementFromSample} updates the counter, recording a value that is the difference between this sample - * and the last sample. (The first call just sets the "last" sample and does not record a value.) For example, this can - * be used to CPU usage rate when only a running total is available by periodically sampling the running total. *
*/ public class ThreadSafeCounter extends ThreadSafeValue { - public static final char TYPE_TAG = 'C'; - - public ThreadSafeCounter(long now) { + public ThreadSafeCounter(final long now) { super(now); } - long previousSample = Long.MIN_VALUE; - - public void incrementFromSample(long n) { - if (Long.MIN_VALUE != previousSample) { - sample(n - previousSample); - } - previousSample = n; - } - public char getTypeTag() { - return TYPE_TAG; + return Counter.TYPE_TAG; } public static final LongFunction FACTORY = ThreadSafeCounter::new; diff --git a/Base/src/main/java/io/deephaven/base/stats/Value.java b/Base/src/main/java/io/deephaven/base/stats/Value.java index 15c715403b1..3d277f9da9a 100644 --- a/Base/src/main/java/io/deephaven/base/stats/Value.java +++ b/Base/src/main/java/io/deephaven/base/stats/Value.java @@ -57,9 +57,9 @@ protected Value(History history) { @SuppressWarnings("NonAtomicOperationOnVolatileField") public void sample(final long x) { n++; + last = x; sum += x; sum2 += x * x; - last = x; if (x > max) { max = x; } diff --git a/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java b/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java index 3b786c0bdeb..9fc4a988c03 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java @@ -22,7 +22,7 @@ public interface DataIndexOptions { /** * Static options that uses a partial table instead of the full table. */ - DataIndexOptions USE_PARTIAL_TABLE = DataIndexOptions.builder().operationUsesPartialTable(true).build(); + DataIndexOptions USING_PARTIAL_TABLE = DataIndexOptions.builder().operationUsesPartialTable(true).build(); /** * Does this operation use only a subset of the DataIndex? diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java index dbd3c3ba58d..2de486cbd02 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractColumnSource.java @@ -3,7 +3,6 @@ // package io.deephaven.engine.table.impl; -import io.deephaven.base.stats.Counter; import io.deephaven.base.stats.Stats; import io.deephaven.base.stats.ThreadSafeCounter; import io.deephaven.base.stats.Value; @@ -70,15 +69,15 @@ public abstract class AbstractColumnSource implements /** * Duration of match() calls using a DataIndex (also provides the count). */ - public static final Value INDEX_FILTER_MILLIS = - Stats.makeItem("AbstractColumnSource", "indexFilter", ThreadSafeCounter.FACTORY, + private static final Value INDEX_FILTER_MILLIS = + Stats.makeItem("AbstractColumnSource", "indexFilterMillis", ThreadSafeCounter.FACTORY, "Duration of match() with a DataIndex in millis") .getValue(); /** * Duration of match() calls using a chunk filter (i.e. no DataIndex). */ - public static final Value CHUNK_FILTER_MILLIS = - Stats.makeItem("AbstractColumnSource", "chunkFilter", ThreadSafeCounter.FACTORY, + private static final Value CHUNK_FILTER_MILLIS = + Stats.makeItem("AbstractColumnSource", "chunkFilterMillis", ThreadSafeCounter.FACTORY, "Duration of match() without a DataIndex in millis") .getValue(); @@ -178,7 +177,7 @@ private WritableRowSet doDataIndexFilter(final boolean invertMatch, @NotNull final RowSet rowsetToFilter, final Object[] keys) { final DataIndexOptions partialOption = - USE_PARTIAL_TABLE_DATA_INDEX ? DataIndexOptions.USE_PARTIAL_TABLE : DataIndexOptions.DEFAULT; + USE_PARTIAL_TABLE_DATA_INDEX ? DataIndexOptions.USING_PARTIAL_TABLE : DataIndexOptions.DEFAULT; final Table indexTable = dataIndex.table(partialOption); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java index 284eed4f20a..77bdb65b50f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java @@ -53,8 +53,9 @@ class MergedDataIndex extends AbstractDataIndex implements DataIndexer.Retainabl /** * The duration in nanos to build a DataIndex table. */ - public static final Value BUILD_INDEX_TABLE_MILLIS = Stats - .makeItem("DataIndex", "buildTable", ThreadSafeCounter.FACTORY, "Duration in millis of building an index") + private static final Value BUILD_INDEX_TABLE_MILLIS = Stats + .makeItem("MergedDataIndex", "buildTableMillis", ThreadSafeCounter.FACTORY, + "Duration in millis of building an index") .getValue(); /** From f184f5312f69a133691883b36d5f2556d1c0f7cf Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 22 Jan 2025 13:28:51 -0500 Subject: [PATCH 07/24] Code review updates. --- .../sources/regioned/MergedDataIndex.java | 163 +++++++++++------- .../TestRegionedColumnSourceManager.java | 14 +- 2 files changed, 118 insertions(+), 59 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java index 77bdb65b50f..481018bdeb5 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java @@ -91,6 +91,12 @@ class MergedDataIndex extends AbstractDataIndex implements DataIndexer.Retainabl */ private volatile Table lazyTable; + /** + * If we have a lazy version of the table, we hold onto the partitioned table, which permits us to select the + * underlying rowsets without needing to reselect the key columns. + */ + private volatile PartitionedTable lazyPartitionedTable; + /** * Whether this index is known to be corrupt. */ @@ -172,14 +178,14 @@ public Table table(final DataIndexOptions options) { } /** - * The RowSetCacher is a bit of a hack that allows us to avoid reading actual rowsets from disk until they are + * The RowSetCacher is a bit of a hack that allows us to avoid reading actual RowSets from disk until they are * actually required for a query operation. We are breaking engine rules in that we reference the source * ColumnSource directly and do not have correct dependencies encoded in a select. MergedDataIndexes are only * permitted for a static table, so we can get away with this. * *

* Once a RowSet has been written, we write down the result into an AtomicReferenceArray so that it need not be read - * repeatedly. If two threads attempt to concurrently fetch the same rowset, then we use a placeholder object in the + * repeatedly. If two threads attempt to concurrently fetch the same RowSet, then we use a placeholder object in the * array to avoid duplicated work. *

*/ @@ -224,15 +230,19 @@ RowSet get(final long rowKey) { // we must try again, someone else has claimed the placeholder continue; } - // it is our responsibility to get the right answer - final ObjectVector inputRowsets = source.get(rowKey); - Assert.neqNull(inputRowsets, "inputRowsets"); + // We won the race, so it is our responsibility to get the right answer - // need to get the value and set it into our own value + final ObjectVector inputRowSets = source.get(rowKey); + Assert.neqNull(inputRowSets, "inputRowSets"); final RowSet computedResult; try { - // noinspection DataFlowIssue - computedResult = mergeRowSets(rowKey, inputRowsets); + if (USE_PARALLEL_LAZY_FETCH) { + //noinspection DataFlowIssue + computedResult = mergeRowSetsParallel(rowKey, inputRowSets); + } else { + //noinspection DataFlowIssue + computedResult = mergeRowSetsSerial(rowKey, inputRowSets); + } } catch (Exception e) { results.set(iRowKey, e); throw e; @@ -249,33 +259,24 @@ private Table buildTable(final boolean lazyRowsetMerge) { if (lazyRowsetMerge) { return lazyTable; } - indexTable = lazyTable.select(); - lazyTable = null; - return indexTable; } final long t0 = System.nanoTime(); try { - final Table locationTable = columnSourceManager.locationTable().coalesce(); - - // Perform a parallelizable update to produce coalesced location index tables with their row sets shifted by - // the appropriate region offset. The row sets are not forced into memory, but keys are in order to enable - // efficient - // grouping. The rowsets are read into memory as part of the mergeRowSets call. - final String[] keyColumnNamesArray = keyColumnNames.toArray(String[]::new); - final Table locationDataIndexes = locationTable - .update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>( - columnSourceManager.locationColumnName(), TableLocation.class, - LOCATION_DATA_INDEX_TABLE_COLUMN_NAME, Table.class, - (final long locationRowKey, final TableLocation location) -> loadIndexTableAndShiftRowSets( - locationRowKey, location, keyColumnNamesArray))))) - .dropColumns(columnSourceManager.locationColumnName()); + final PartitionedTable partitionedTable; + if (lazyPartitionedTable != null) { + // We are synchronized, and can begin processing from the PartitionedTable rather than starting from + // scratch. The first step is to force our rowsets into memory, in parallel. + partitionedTable = lazyPartitionedTable.transform(t -> t.update(ROW_SET_COLUMN_NAME)); + } else { + partitionedTable = buildPartitionedTable(lazyRowsetMerge); + } // Merge all the location index tables into a single table - final Table mergedDataIndexes = PartitionedTableFactory.of(locationDataIndexes).merge(); - + final Table mergedDataIndexes = partitionedTable.merge(); // Group the merged data indexes by the keys - final Table groupedByKeyColumns = mergedDataIndexes.groupBy(keyColumnNamesArray); + final Table groupedByKeyColumns = mergedDataIndexes.groupBy(keyColumnNames.toArray(String[]::new)); + lookupFunction = AggregationProcessor.getRowLookup(groupedByKeyColumns); final Table combined; if (lazyRowsetMerge) { @@ -288,25 +289,25 @@ private Table buildTable(final boolean lazyRowsetMerge) { combined = groupedByKeyColumns .view(List.of(SelectColumn.ofStateless(new MultiSourceFunctionalColumn<>(List.of(), ROW_SET_COLUMN_NAME, RowSet.class, (k, v) -> rowsetCacher.get(k))))); + + lazyPartitionedTable = partitionedTable; + lazyTable = combined; } else { // Combine the row sets from each group into a single row set final List mergeFunction = List.of(SelectColumn.ofStateless(new FunctionalColumn<>( ROW_SET_COLUMN_NAME, ObjectVector.class, ROW_SET_COLUMN_NAME, RowSet.class, - MergedDataIndex::mergeRowSets))); + MergedDataIndex::mergeRowSetsSerial))); combined = groupedByKeyColumns.update(mergeFunction); - } - Assert.assertion(combined.isFlat(), "combined.isFlat()"); - Assert.eq(groupedByKeyColumns.size(), "groupedByKeyColumns.size()", combined.size(), "combined.size()"); - - lookupFunction = AggregationProcessor.getRowLookup(groupedByKeyColumns); - if (lazyRowsetMerge) { - lazyTable = combined; - } else { indexTable = combined; + // if we were built off a lazy table, null it out now that we've setup the indexTable + lazyPartitionedTable = null; + lazyTable = null; } + Assert.assertion(combined.isFlat(), "combined.isFlat()"); + Assert.eq(groupedByKeyColumns.size(), "groupedByKeyColumns.size()", combined.size(), "combined.size()"); return combined; } finally { @@ -315,10 +316,28 @@ private Table buildTable(final boolean lazyRowsetMerge) { } } + private PartitionedTable buildPartitionedTable(boolean lazyRowsetMerge) { + final String[] keyColumnNamesArray = keyColumnNames.toArray(String[]::new); + final Table locationTable = columnSourceManager.locationTable().coalesce(); + // Perform a parallelizable update to produce coalesced location index tables with their row sets shifted by + // the appropriate region offset. The row sets are not forced into memory, but keys are in order to enable + // efficient grouping. The rowsets are read into memory as part of the mergeRowSets call. + final Table locationDataIndexes = locationTable + .update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>( + columnSourceManager.locationColumnName(), TableLocation.class, + LOCATION_DATA_INDEX_TABLE_COLUMN_NAME, Table.class, + (final long locationRowKey, final TableLocation location) -> loadIndexTableAndShiftRowSets( + locationRowKey, location, keyColumnNamesArray, !lazyRowsetMerge))))) + .dropColumns(columnSourceManager.locationColumnName()); + + return PartitionedTableFactory.of(locationDataIndexes); + } + private static Table loadIndexTableAndShiftRowSets( final long locationRowKey, @NotNull final TableLocation location, - @NotNull final String[] keyColumnNames) { + @NotNull final String[] keyColumnNames, + final boolean selectRowSets) { final BasicDataIndex dataIndex = location.getDataIndex(keyColumnNames); if (dataIndex == null) { throw new UncheckedDeephavenException(String.format("Failed to load data index [%s] for location %s", @@ -328,25 +347,37 @@ private static Table loadIndexTableAndShiftRowSets( final long shiftAmount = RegionedColumnSource.getFirstRowKey(Math.toIntExact(locationRowKey)); final Table coalesced = indexTable.coalesce(); - // pull the key columns into memory while we are parallel; - final Table withInMemoryKeyColumns = coalesced.update(keyColumnNames); - final Selectable shiftFunction; if (shiftAmount == 0) { - shiftFunction = - Selectable.of(ColumnName.of(ROW_SET_COLUMN_NAME), ColumnName.of(dataIndex.rowSetColumnName())); + // A source column would be more convenient, but we are going to close the RowSet after we are done merging + // it and should not allow that close call to pass through to the original table. + shiftFunction = new FunctionalColumn<>( + dataIndex.rowSetColumnName(), RowSet.class, + ROW_SET_COLUMN_NAME, RowSet.class, + RowSet::copy); } else { shiftFunction = new FunctionalColumn<>( dataIndex.rowSetColumnName(), RowSet.class, ROW_SET_COLUMN_NAME, RowSet.class, (final RowSet rowSet) -> rowSet.shift(shiftAmount)); } - // the rowset column shift need not occur until we perform the rowset merge operation - which is either - // lazy or part of an update [which itself can be parallel]. - return withInMemoryKeyColumns.updateView(List.of(SelectColumn.ofStateless(shiftFunction))); + if (selectRowSets) { + // pull the key columns into memory while we are parallel; and read all the rowsets (by virtue of the shift) + final List columns = new ArrayList<>(keyColumnNames.length + 1); + Arrays.stream(keyColumnNames).map(ColumnName::of).forEach(columns::add); + columns.add(SelectColumn.ofStateless(shiftFunction)); + return coalesced.update(columns); + } else { + // pull the key columns into memory while we are parallel; but do not read all the RowSets + final Table withInMemoryKeyColumns = coalesced.update(keyColumnNames); + return withInMemoryKeyColumns.update(List.of(SelectColumn.ofStateless(shiftFunction))); + } } - private static RowSet mergeRowSets( + /** + * The returned RowSet is owned by the caller. The input RowSets are closed. + */ + private static RowSet mergeRowSetsSerial( @SuppressWarnings("unused") final long unusedRowKey, @NotNull final ObjectVector keyRowSets) { final long numRowSets = keyRowSets.size(); @@ -355,22 +386,38 @@ private static RowSet mergeRowSets( // we steal the reference, the input is never used again return keyRowSets.get(0); } + final RowSetBuilderSequential builder = RowSetFactory.builderSequential(); + try (final CloseableIterator rowSets = keyRowSets.iterator()) { + rowSets.forEachRemaining(rs -> { + builder.appendRowSequence(rs); + rs.close(); + }); + } - if (USE_PARALLEL_LAZY_FETCH) { - LongStream.range(0, numRowSets).parallel().mapToObj(keyRowSets::get) - .sorted(Comparator.comparingLong(RowSet::firstRowKey)).forEachOrdered(rs -> { - builder.appendRowSequence(rs); - rs.close(); - }); - } else { - try (final CloseableIterator rowSets = keyRowSets.iterator()) { - rowSets.forEachRemaining(rs -> { + return builder.build(); + } + + /** + * The returned RowSet is owned by the caller. The input RowSets are closed. + */ + private static RowSet mergeRowSetsParallel( + @SuppressWarnings("unused") final long unusedRowKey, + @NotNull final ObjectVector keyRowSets) { + final long numRowSets = keyRowSets.size(); + + if (numRowSets == 1) { + // we steal the reference, the input is never used again + return keyRowSets.get(0); + } + final RowSetBuilderSequential builder = RowSetFactory.builderSequential(); + + LongStream.range(0, numRowSets).parallel().mapToObj(keyRowSets::get) + .sorted(Comparator.comparingLong(RowSet::firstRowKey)).forEachOrdered(rs -> { builder.appendRowSequence(rs); rs.close(); }); - } - } + return builder.build(); } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java index 8a2d6817681..670e59602b2 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java @@ -388,6 +388,15 @@ private static void checkIndex( @Test public void testStaticBasics() { + testStaticBasics(DataIndexOptions.DEFAULT); + } + + @Test + public void testStaticBasicsPartial() { + testStaticBasics(DataIndexOptions.USING_PARTIAL_TABLE); + } + + private void testStaticBasics(final DataIndexOptions options) { SUT = new RegionedColumnSourceManager(false, componentFactory, ColumnToCodecMappings.EMPTY, columnDefinitions); assertEquals(makeColumnSourceMap(), SUT.getColumnSources()); @@ -451,7 +460,10 @@ public void testStaticBasics() { } captureIndexes(SUT.initialize()); - capturedGroupingColumnIndex.table(); // Force us to build the merged index *before* we check satisfaction + + // Force us to build the merged index *before* we check satisfaction + // the checkIndexes method will call table() a second time with the DEFAULT options; which exercises lazy conversion + capturedGroupingColumnIndex.table(options); checkIndexes(); assertEquals(Arrays.asList(tableLocation1A, tableLocation1B), SUT.includedLocations()); From 4f2fb67ec3e7185cdf596ce6e75fd8a651e3d2b9 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 22 Jan 2025 13:29:02 -0500 Subject: [PATCH 08/24] spotless --- .../engine/table/impl/sources/regioned/MergedDataIndex.java | 6 +++--- .../sources/regioned/TestRegionedColumnSourceManager.java | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java index 481018bdeb5..9e680008fb0 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java @@ -237,10 +237,10 @@ RowSet get(final long rowKey) { final RowSet computedResult; try { if (USE_PARALLEL_LAZY_FETCH) { - //noinspection DataFlowIssue + // noinspection DataFlowIssue computedResult = mergeRowSetsParallel(rowKey, inputRowSets); } else { - //noinspection DataFlowIssue + // noinspection DataFlowIssue computedResult = mergeRowSetsSerial(rowKey, inputRowSets); } } catch (Exception e) { @@ -266,7 +266,7 @@ private Table buildTable(final boolean lazyRowsetMerge) { final PartitionedTable partitionedTable; if (lazyPartitionedTable != null) { // We are synchronized, and can begin processing from the PartitionedTable rather than starting from - // scratch. The first step is to force our rowsets into memory, in parallel. + // scratch. The first step is to force our rowsets into memory, in parallel. partitionedTable = lazyPartitionedTable.transform(t -> t.update(ROW_SET_COLUMN_NAME)); } else { partitionedTable = buildPartitionedTable(lazyRowsetMerge); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java index 670e59602b2..7d3d2e7a08b 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/regioned/TestRegionedColumnSourceManager.java @@ -462,7 +462,8 @@ private void testStaticBasics(final DataIndexOptions options) { captureIndexes(SUT.initialize()); // Force us to build the merged index *before* we check satisfaction - // the checkIndexes method will call table() a second time with the DEFAULT options; which exercises lazy conversion + // the checkIndexes method will call table() a second time with the DEFAULT options; which exercises lazy + // conversion capturedGroupingColumnIndex.table(options); checkIndexes(); From 1d37c8c199e5ad736c6ae487b316680227506cdf Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 22 Jan 2025 13:48:35 -0500 Subject: [PATCH 09/24] Add Join Property, remove unused properties. --- .../benchmark/engine/GroupByBenchmark.java | 14 -------------- .../benchmark/engine/LastByBenchmark.java | 5 ----- .../engine/table/impl/JoinControl.java | 4 ++++ .../deephaven/engine/table/impl/QueryTable.java | 17 +++++++---------- 4 files changed, 11 insertions(+), 29 deletions(-) diff --git a/engine/benchmark/src/benchmark/java/io/deephaven/benchmark/engine/GroupByBenchmark.java b/engine/benchmark/src/benchmark/java/io/deephaven/benchmark/engine/GroupByBenchmark.java index 50af9c1fff5..c544aeb41ce 100644 --- a/engine/benchmark/src/benchmark/java/io/deephaven/benchmark/engine/GroupByBenchmark.java +++ b/engine/benchmark/src/benchmark/java/io/deephaven/benchmark/engine/GroupByBenchmark.java @@ -38,9 +38,6 @@ public class GroupByBenchmark { @Param({"Historical"}) private String tableType; - @Param({"operator"}) - private String mode; - @Param({"String", "Int", "Composite"}) private String keyType; @@ -122,17 +119,6 @@ public void setupEnv(BenchmarkParams params) { state = new TableBenchmarkState(BenchmarkTools.stripName(params.getBenchmark()), params.getWarmup().getCount()); table = bmt.getTable().coalesce().dropColumns("PartCol"); - - switch (mode) { - case "chunked": - QueryTable.USE_OLDER_CHUNKED_BY = true; - break; - case "operator": - QueryTable.USE_OLDER_CHUNKED_BY = false; - break; - default: - throw new IllegalArgumentException("Unknown mode " + mode); - } } @TearDown(Level.Trial) diff --git a/engine/benchmark/src/benchmark/java/io/deephaven/benchmark/engine/LastByBenchmark.java b/engine/benchmark/src/benchmark/java/io/deephaven/benchmark/engine/LastByBenchmark.java index c3dcc331c8e..c0ee0ada285 100644 --- a/engine/benchmark/src/benchmark/java/io/deephaven/benchmark/engine/LastByBenchmark.java +++ b/engine/benchmark/src/benchmark/java/io/deephaven/benchmark/engine/LastByBenchmark.java @@ -59,9 +59,6 @@ public class LastByBenchmark { @Param({"1"}) private int valueCount; - @Param({"true"}) - private boolean tracked; - private Table table; private String keyName; @@ -171,8 +168,6 @@ public void setupEnv(BenchmarkParams params) { state = new TableBenchmarkState(BenchmarkTools.stripName(params.getBenchmark()), params.getWarmup().getCount()); table = bmt.getTable().coalesce().dropColumns("PartCol"); - - QueryTable.TRACKED_FIRST_BY = QueryTable.TRACKED_LAST_BY = tracked; } @TearDown(Level.Trial) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/JoinControl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/JoinControl.java index 57af02c692f..cebd69e1365 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/JoinControl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/JoinControl.java @@ -59,6 +59,10 @@ int tableSize(final long expectedEntries) { @Nullable DataIndex dataIndexToUse(Table table, ColumnSource[] sources) { + // Configuration property that serves as an escape hatch + if (!QueryTable.USE_DATA_INDEX_FOR_JOINS) { + return null; + } final DataIndexer indexer = DataIndexer.existingOf(table.getRowSet()); return indexer == null ? null : LivenessScopeStack.computeEnclosed( diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index b5cec945450..fb86274458b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -211,6 +211,13 @@ public interface MemoizableOperation Date: Wed, 22 Jan 2025 13:49:58 -0500 Subject: [PATCH 10/24] spotless --- .../main/java/io/deephaven/engine/table/impl/QueryTable.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index fb86274458b..ff642fe80a6 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -212,8 +212,8 @@ public interface MemoizableOperation Date: Wed, 22 Jan 2025 15:43:23 -0500 Subject: [PATCH 11/24] actually use updateView as intended. --- .../engine/table/impl/sources/regioned/MergedDataIndex.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java index 9e680008fb0..a244221853b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java @@ -86,8 +86,7 @@ class MergedDataIndex extends AbstractDataIndex implements DataIndexer.Retainabl private volatile Table indexTable; /** - * A lazy version of the table. This value is never set if indexTable is set. Can be converted to indexTable by - * selecting the RowSet column. + * A lazy version of the table. This value is never set if indexTable is set. */ private volatile Table lazyTable; @@ -370,7 +369,7 @@ private static Table loadIndexTableAndShiftRowSets( } else { // pull the key columns into memory while we are parallel; but do not read all the RowSets final Table withInMemoryKeyColumns = coalesced.update(keyColumnNames); - return withInMemoryKeyColumns.update(List.of(SelectColumn.ofStateless(shiftFunction))); + return withInMemoryKeyColumns.updateView(List.of(SelectColumn.ofStateless(shiftFunction))); } } From 63b3733b2f0eff929d0122aef13a297da432fae5 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 22 Jan 2025 15:55:06 -0500 Subject: [PATCH 12/24] update comment. --- .../impl/sources/regioned/MergedDataIndex.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java index a244221853b..679e95ab4e1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java @@ -315,18 +315,24 @@ private Table buildTable(final boolean lazyRowsetMerge) { } } - private PartitionedTable buildPartitionedTable(boolean lazyRowsetMerge) { + /** + * Perform a parallelizable update to produce coalesced location index tables with their row sets shifted by the + * appropriate region offset. The keys are always forced into memory so that the groupBy operation need not read + * from disk serially. + *

+ * If lazyRowSetMerge is true, the rowsets are read into memory as part of the mergeRowSets call and are not forced + * into memory here. If lazyRowSetMerge is false, then the RowSets are also forced into memory. + *

+ */ + private PartitionedTable buildPartitionedTable(final boolean lazyRowSetMerge) { final String[] keyColumnNamesArray = keyColumnNames.toArray(String[]::new); final Table locationTable = columnSourceManager.locationTable().coalesce(); - // Perform a parallelizable update to produce coalesced location index tables with their row sets shifted by - // the appropriate region offset. The row sets are not forced into memory, but keys are in order to enable - // efficient grouping. The rowsets are read into memory as part of the mergeRowSets call. final Table locationDataIndexes = locationTable .update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>( columnSourceManager.locationColumnName(), TableLocation.class, LOCATION_DATA_INDEX_TABLE_COLUMN_NAME, Table.class, (final long locationRowKey, final TableLocation location) -> loadIndexTableAndShiftRowSets( - locationRowKey, location, keyColumnNamesArray, !lazyRowsetMerge))))) + locationRowKey, location, keyColumnNamesArray, !lazyRowSetMerge))))) .dropColumns(columnSourceManager.locationColumnName()); return PartitionedTableFactory.of(locationDataIndexes); From 99a2efc65a7e9c4134456a4f4d9c836531f2d655 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 22 Jan 2025 15:55:24 -0500 Subject: [PATCH 13/24] update var name. --- .../table/impl/sources/regioned/MergedDataIndex.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java index 679e95ab4e1..be83c7d330d 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java @@ -253,9 +253,9 @@ RowSet get(final long rowKey) { } } - private Table buildTable(final boolean lazyRowsetMerge) { + private Table buildTable(final boolean lazyRowSetMerge) { if (lazyTable != null) { - if (lazyRowsetMerge) { + if (lazyRowSetMerge) { return lazyTable; } } @@ -268,7 +268,7 @@ private Table buildTable(final boolean lazyRowsetMerge) { // scratch. The first step is to force our rowsets into memory, in parallel. partitionedTable = lazyPartitionedTable.transform(t -> t.update(ROW_SET_COLUMN_NAME)); } else { - partitionedTable = buildPartitionedTable(lazyRowsetMerge); + partitionedTable = buildPartitionedTable(lazyRowSetMerge); } // Merge all the location index tables into a single table @@ -278,7 +278,7 @@ private Table buildTable(final boolean lazyRowsetMerge) { lookupFunction = AggregationProcessor.getRowLookup(groupedByKeyColumns); final Table combined; - if (lazyRowsetMerge) { + if (lazyRowSetMerge) { final ColumnSource> vectorColumnSource = groupedByKeyColumns.getColumnSource(ROW_SET_COLUMN_NAME); From a14f3fc311729b3f1c6e4c1aa6f2267c1b9b23b3 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Thu, 23 Jan 2025 15:24:42 -0500 Subject: [PATCH 14/24] Synchronization instead. --- .../deephaven/base/stats/ThreadSafeValue.java | 27 +++++-------------- .../java/io/deephaven/base/stats/Value.java | 16 +++++------ 2 files changed, 14 insertions(+), 29 deletions(-) diff --git a/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java b/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java index 4766db3ca33..fe4b3fa4cc5 100644 --- a/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java +++ b/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java @@ -16,15 +16,6 @@ *

*/ public abstract class ThreadSafeValue extends Value { - private static final AtomicLongFieldUpdater N_UPDATER = AtomicLongFieldUpdater.newUpdater(Value.class, "n"); - private static final AtomicLongFieldUpdater SUM_UPDATER = - AtomicLongFieldUpdater.newUpdater(Value.class, "sum"); - private static final AtomicLongFieldUpdater SUM2_UPDATER = - AtomicLongFieldUpdater.newUpdater(Value.class, "sum2"); - private static final AtomicLongFieldUpdater MAX_UPDATER = - AtomicLongFieldUpdater.newUpdater(Value.class, "max"); - private static final AtomicLongFieldUpdater MIN_UPDATER = - AtomicLongFieldUpdater.newUpdater(Value.class, "min"); public ThreadSafeValue(long now) { super(now); @@ -35,16 +26,12 @@ protected ThreadSafeValue(History history) { } @Override - public void sample(final long x) { - N_UPDATER.incrementAndGet(this); - SUM_UPDATER.addAndGet(this, x); - SUM2_UPDATER.addAndGet(this, x * x); - last = x; - if (x > max) { - AtomicUtil.setMax(this, MAX_UPDATER, x); - } - if (x < min) { - AtomicUtil.setMin(this, MIN_UPDATER, x); - } + public synchronized void sample(final long x) { + super.sample(x); + } + + @Override + public synchronized String toString() { + return super.toString(); } } diff --git a/Base/src/main/java/io/deephaven/base/stats/Value.java b/Base/src/main/java/io/deephaven/base/stats/Value.java index 3d277f9da9a..938f20c39ba 100644 --- a/Base/src/main/java/io/deephaven/base/stats/Value.java +++ b/Base/src/main/java/io/deephaven/base/stats/Value.java @@ -8,12 +8,12 @@ public abstract class Value { * These members are volatile, the sample(long) method is not thread safe; and you can get wrong answers out of it. * If you require safety, you should instead use a ThreadSafeValue. */ - protected volatile long n = 0; - protected volatile long last = 0; - protected volatile long sum = 0; - protected volatile long sum2 = 0; - protected volatile long max = Long.MIN_VALUE; - protected volatile long min = Long.MAX_VALUE; + protected long n = 0; + protected long last = 0; + protected long sum = 0; + protected long sum2 = 0; + protected long max = Long.MIN_VALUE; + protected long min = Long.MAX_VALUE; private boolean alwaysUpdated = false; @@ -54,7 +54,6 @@ protected Value(History history) { this.history = history; } - @SuppressWarnings("NonAtomicOperationOnVolatileField") public void sample(final long x) { n++; last = x; @@ -111,8 +110,7 @@ public String toString() { final double avg = (double) sum / n; return String.format("Value{n=%,d, sum=%,d, max=%,d, min=%,d, avg=%,.3f, std=%,.3f}", n, sum, max, min, avg, std); - } else { - return String.format("Value{n=%,d}", n); } + return String.format("Value{n=%,d}", n); } } From 2e5a9b494040cbef0bdebc5706f037d5433367ca Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Thu, 23 Jan 2025 15:34:45 -0500 Subject: [PATCH 15/24] comment --- Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java b/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java index fe4b3fa4cc5..a215fb39a91 100644 --- a/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java +++ b/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java @@ -11,7 +11,7 @@ * A thread-safe extension of the {@link Value} class. * *

- * The {@link #sample(long)} method uses atomic CAS operations, so may introduce contention compared to the unsafe Value + * The {@link #sample(long)} method is synchronized, so may introduce contention compared to the unsafe Value * version of sample. *

*/ From 638d3e1773f71fdab97477ea70fd0f316a2fa956 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Thu, 23 Jan 2025 15:35:50 -0500 Subject: [PATCH 16/24] fix --- Base/src/main/java/io/deephaven/base/stats/Value.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Base/src/main/java/io/deephaven/base/stats/Value.java b/Base/src/main/java/io/deephaven/base/stats/Value.java index 938f20c39ba..31e8af6316e 100644 --- a/Base/src/main/java/io/deephaven/base/stats/Value.java +++ b/Base/src/main/java/io/deephaven/base/stats/Value.java @@ -5,7 +5,7 @@ public abstract class Value { /** - * These members are volatile, the sample(long) method is not thread safe; and you can get wrong answers out of it. + * The sample(long) method is not thread safe; and you can get wrong answers out of it. * If you require safety, you should instead use a ThreadSafeValue. */ protected long n = 0; From 93a8a90ab469415f6dfadffe81203f6df1e3a8c8 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Thu, 23 Jan 2025 16:19:16 -0500 Subject: [PATCH 17/24] spotless --- .../main/java/io/deephaven/base/stats/ThreadSafeValue.java | 4 ++-- Base/src/main/java/io/deephaven/base/stats/Value.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java b/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java index a215fb39a91..b0cee5a2d14 100644 --- a/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java +++ b/Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java @@ -11,8 +11,8 @@ * A thread-safe extension of the {@link Value} class. * *

- * The {@link #sample(long)} method is synchronized, so may introduce contention compared to the unsafe Value - * version of sample. + * The {@link #sample(long)} method is synchronized, so may introduce contention compared to the unsafe Value version of + * sample. *

*/ public abstract class ThreadSafeValue extends Value { diff --git a/Base/src/main/java/io/deephaven/base/stats/Value.java b/Base/src/main/java/io/deephaven/base/stats/Value.java index 31e8af6316e..dd260221f1d 100644 --- a/Base/src/main/java/io/deephaven/base/stats/Value.java +++ b/Base/src/main/java/io/deephaven/base/stats/Value.java @@ -5,8 +5,8 @@ public abstract class Value { /** - * The sample(long) method is not thread safe; and you can get wrong answers out of it. - * If you require safety, you should instead use a ThreadSafeValue. + * The sample(long) method is not thread safe; and you can get wrong answers out of it. If you require safety, you + * should instead use a ThreadSafeValue. */ protected long n = 0; protected long last = 0; From 751339e79757bc51d54a8165aaeb4979509c7ca0 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Thu, 23 Jan 2025 18:09:18 -0500 Subject: [PATCH 18/24] MOAR parallelism. --- .../engine/table/impl/sources/regioned/MergedDataIndex.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java index be83c7d330d..b03594771d9 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java @@ -371,10 +371,11 @@ private static Table loadIndexTableAndShiftRowSets( final List columns = new ArrayList<>(keyColumnNames.length + 1); Arrays.stream(keyColumnNames).map(ColumnName::of).forEach(columns::add); columns.add(SelectColumn.ofStateless(shiftFunction)); - return coalesced.update(columns); + return ForkJoinPoolOperationInitializer.ensureParallelizable(() -> coalesced.update(columns)).get(); } else { // pull the key columns into memory while we are parallel; but do not read all the RowSets - final Table withInMemoryKeyColumns = coalesced.update(keyColumnNames); + final Table withInMemoryKeyColumns = + ForkJoinPoolOperationInitializer.ensureParallelizable(() -> coalesced.update(keyColumnNames)).get(); return withInMemoryKeyColumns.updateView(List.of(SelectColumn.ofStateless(shiftFunction))); } } From 0663467db4bc4a6ade1c1f29802c9d328d141b20 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Thu, 23 Jan 2025 18:21:13 -0500 Subject: [PATCH 19/24] more parallel again. --- .../engine/table/impl/sources/regioned/MergedDataIndex.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java index b03594771d9..76d305b8bcc 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java @@ -266,7 +266,8 @@ private Table buildTable(final boolean lazyRowSetMerge) { if (lazyPartitionedTable != null) { // We are synchronized, and can begin processing from the PartitionedTable rather than starting from // scratch. The first step is to force our rowsets into memory, in parallel. - partitionedTable = lazyPartitionedTable.transform(t -> t.update(ROW_SET_COLUMN_NAME)); + partitionedTable = lazyPartitionedTable.transform(t -> ForkJoinPoolOperationInitializer + .ensureParallelizable(() -> t.update(ROW_SET_COLUMN_NAME)).get()); } else { partitionedTable = buildPartitionedTable(lazyRowSetMerge); } From 08d8b5943eefabd03d734bc8fbd0551d67de8bb8 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Thu, 23 Jan 2025 18:27:03 -0500 Subject: [PATCH 20/24] Update engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java Co-authored-by: Ryan Caudy --- .../src/main/java/io/deephaven/engine/table/BasicDataIndex.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java b/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java index 08453259398..a33caf906ef 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java @@ -102,7 +102,7 @@ default ColumnSource rowSetColumn() { /** * Get the {@link RowSet} {@link ColumnSource} of the index {@link #table() table}. * - * @param options required for building the Index table this ColumnSource is retrieved from + * @param options parameters for controlling how the the table will be built (if necessary) in order to retrieve the result {@link RowSet} {@link ColumnSource} * * @return The {@link RowSet} {@link ColumnSource} */ From bd27aa74c877c75bc6360a68fa7b0eb5fe5cdeb1 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Thu, 23 Jan 2025 18:27:19 -0500 Subject: [PATCH 21/24] Update engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java Co-authored-by: Ryan Caudy --- .../api/src/main/java/io/deephaven/engine/table/DataIndex.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java b/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java index 8bea446650c..f248afb014c 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/DataIndex.java @@ -57,7 +57,7 @@ default RowKeyLookup rowKeyLookup() { * {@code true}, this lookup function is only guaranteed to be accurate for the current cycle. Lookup keys should be * in the order of the index's key columns. * - * @param options required for building the table, if required by this RowKeyLookup + * @param options parameters for building the table, if required by this RowKeyLookup * * @return A function that provides map-like lookup of index {@link #table()} row keys from an index lookup key */ From 28bb1d7255b49a51c660e18eb9cb4ab427b82e21 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Thu, 23 Jan 2025 18:27:25 -0500 Subject: [PATCH 22/24] Update engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java Co-authored-by: Ryan Caudy --- .../main/java/io/deephaven/engine/table/DataIndexOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java b/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java index 9fc4a988c03..12430d52c0c 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java @@ -20,7 +20,7 @@ public interface DataIndexOptions { DataIndexOptions DEFAULT = DataIndexOptions.builder().build(); /** - * Static options that uses a partial table instead of the full table. + * Static options for operations that use a partial table instead of the full table. */ DataIndexOptions USING_PARTIAL_TABLE = DataIndexOptions.builder().operationUsesPartialTable(true).build(); From 96ba5025ad56e92835e2cfc7f2f723dd9ddfb840 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Thu, 23 Jan 2025 18:27:30 -0500 Subject: [PATCH 23/24] Update engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java Co-authored-by: Ryan Caudy --- .../main/java/io/deephaven/engine/table/DataIndexOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java b/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java index 12430d52c0c..2498d36966d 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/DataIndexOptions.java @@ -15,7 +15,7 @@ @BuildableStyle public interface DataIndexOptions { /** - * Static default options, which uses a full table. + * Static default options, which expect that operations will use the full table. */ DataIndexOptions DEFAULT = DataIndexOptions.builder().build(); From 9fe9b61f499d531bd5eda6fad5a247248a53ff68 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Thu, 23 Jan 2025 18:28:18 -0500 Subject: [PATCH 24/24] spotless --- .../main/java/io/deephaven/engine/table/BasicDataIndex.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java b/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java index a33caf906ef..f13141073b1 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/BasicDataIndex.java @@ -102,7 +102,8 @@ default ColumnSource rowSetColumn() { /** * Get the {@link RowSet} {@link ColumnSource} of the index {@link #table() table}. * - * @param options parameters for controlling how the the table will be built (if necessary) in order to retrieve the result {@link RowSet} {@link ColumnSource} + * @param options parameters for controlling how the the table will be built (if necessary) in order to retrieve the + * result {@link RowSet} {@link ColumnSource} * * @return The {@link RowSet} {@link ColumnSource} */