Skip to content

Commit

Permalink
perf: DH-18300: Improve DataIndex performance. (#6585)
Browse files Browse the repository at this point in the history
DataIndex, particularly when used for where() filters had missing
parallelization opportunities; and would read more data than strictly
necessary to satisfy the filter.

---------

Co-authored-by: Ryan Caudy <rcaudy@gmail.com>
  • Loading branch information
cpwright and rcaudy committed Jan 24, 2025
1 parent 824a1c6 commit 5841eec
Show file tree
Hide file tree
Showing 22 changed files with 775 additions and 166 deletions.
41 changes: 41 additions & 0 deletions Base/src/main/java/io/deephaven/base/AtomicUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

public abstract class AtomicUtil {

Expand Down Expand Up @@ -267,4 +268,44 @@ public static int atomicAndNot(AtomicInteger i, int mask) {
} while (!i.compareAndSet(expect, update));
return update;
}

/**
* Sets the field to the minimum of the current value and the passed in value
*
* @param o the object to update
* @param fu the field updater
* @param value the value that is a candidate for the minumum
* @return true if the minimum was set
* @param <T> the type of o
*/
public static <T> boolean setMin(final T o, final AtomicLongFieldUpdater<T> fu, final long value) {
long current = fu.get(o);
while (current > value) {
if (fu.compareAndSet(o, current, value)) {
return true;
}
current = fu.get(o);
}
return false;
}

/**
* Sets the field to the maximum of the current value and the passed in value
*
* @param o the object to update
* @param fu the field updater
* @param value the value that is a candidate for the maximum
* @return true if the maximum was set
* @param <T> the type of o
*/
public static <T> boolean setMax(final T o, final AtomicLongFieldUpdater<T> fu, final long value) {
long current = fu.get(o);
while (value > current) {
if (fu.compareAndSet(o, current, value)) {
return true;
}
current = fu.get(o);
}
return false;
}
}
30 changes: 30 additions & 0 deletions Base/src/main/java/io/deephaven/base/stats/ThreadSafeCounter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.base.stats;

import java.util.function.LongFunction;

// --------------------------------------------------------------------

/**
* A statistic where each value represents an additive quantity, and thus the sum of the values <U>does</U> have
* meaning. Examples include event counts and processing duration. If the sum of the values <I>does not</I> have a
* useful interpretation, use {@link State} instead.
* <UL>
* <LI>{@link #increment} updates the counter, recording a single value. This is the most common usage. ({@link #sample}
* does exactly the same thing but is a poor verb to use with a Counter.)
* </UL>
*/
public class ThreadSafeCounter extends ThreadSafeValue {

public ThreadSafeCounter(final long now) {
super(now);
}

public char getTypeTag() {
return Counter.TYPE_TAG;
}

public static final LongFunction<ThreadSafeCounter> FACTORY = ThreadSafeCounter::new;
}
37 changes: 37 additions & 0 deletions Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.base.stats;

import io.deephaven.base.AtomicUtil;

import java.util.concurrent.atomic.AtomicLongFieldUpdater;

/**
* A thread-safe extension of the {@link Value} class.
*
* <p>
* The {@link #sample(long)} method is synchronized, so may introduce contention compared to the unsafe Value version of
* sample.
* </p>
*/
public abstract class ThreadSafeValue extends Value {

public ThreadSafeValue(long now) {
super(now);
}

protected ThreadSafeValue(History history) {
super(history);
}

@Override
public synchronized void sample(final long x) {
super.sample(x);
}

@Override
public synchronized String toString() {
return super.toString();
}
}
18 changes: 16 additions & 2 deletions Base/src/main/java/io/deephaven/base/stats/Value.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
package io.deephaven.base.stats;

public abstract class Value {

/**
* The sample(long) method is not thread safe; and you can get wrong answers out of it. If you require safety, you
* should instead use a ThreadSafeValue.
*/
protected long n = 0;
protected long last = 0;
protected long sum = 0;
Expand Down Expand Up @@ -51,7 +54,7 @@ protected Value(History history) {
this.history = history;
}

public void sample(long x) {
public void sample(final long x) {
n++;
last = x;
sum += x;
Expand Down Expand Up @@ -99,4 +102,15 @@ public void update(Item item, ItemUpdateListener listener, long logInterval, lon
}
}
}

@Override
public String toString() {
if (n > 0) {
final double std = Math.sqrt(n > 1 ? (sum2 - ((double) sum * sum / (double) n)) / (n - 1) : Double.NaN);
final double avg = (double) sum / n;
return String.format("Value{n=%,d, sum=%,d, max=%,d, min=%,d, avg=%,.3f, std=%,.3f}", n, sum, max, min, avg,
std);
}
return String.format("Value{n=%,d}", n);
}
}
39 changes: 39 additions & 0 deletions Base/src/test/java/io/deephaven/base/stats/TestValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import junit.framework.TestCase;

import java.util.concurrent.Semaphore;
import java.util.function.LongFunction;

// --------------------------------------------------------------------
Expand Down Expand Up @@ -49,6 +50,44 @@ public void testCounter() {
checkValue(Counter.FACTORY);
}

public void testToString() {
// this is purposefully a heisentest, this should make it fail if it is really broken
for (int ii = 0; ii < 10; ++ii) {
doTestToString();
}
}

public void doTestToString() {
// we are testing toString, but also creating a pile of threads to exercise some of the AtomicFieldUpdater
// behavior of the value
final Value counter = ThreadSafeCounter.FACTORY.apply(0L);

final String emptyString = counter.toString();
assertEquals("Value{n=0}", emptyString);

final Semaphore semaphore = new Semaphore(0);
final Semaphore completion = new Semaphore(0);
final int n = 100;
for (int ii = 0; ii < n; ++ii) {
final int fii = ii;
new Thread(() -> {
semaphore.acquireUninterruptibly();
counter.sample(fii);
completion.release();
}).start();
}
semaphore.release(100);
completion.acquireUninterruptibly(100);

assertEquals(n, counter.getN());
assertEquals(((n - 1) * n) / 2, counter.getSum());
assertEquals(0, counter.getMin());
assertEquals(99, counter.getMax());

final String asString = counter.toString();
assertEquals("Value{n=100, sum=4,950, max=99, min=0, avg=49.500, std=29.011}", asString);
}

// ----------------------------------------------------------------
private void checkValue(LongFunction<? extends Value> factory) {
Value value = factory.apply(1000L);
Expand Down
5 changes: 5 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ For more information, see:
* [gh pr create](https://cli.github.com/manual/gh_pr_create)
* [CLI In Use](https://cli.github.com/manual/examples.html)

## Labels

- Each pull request must have a `ReleaseNotesNeeded` label if a user can perceive the changes. Build or testing-only changes should have a `NoReleaseNotesNeeded` label.
- Each pull request must have a `DocumentationNeeded` label if the user guide should be updated. Pull requests that do not require a user guide update should have a `NoDocumentationNeeded` label.

## Styleguide
The [styleguide](style/README.md) is applied globally to the entire project, except for generated code that gets checked in.
To apply the styleguide, run `./gradlew spotlessApply`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,46 @@ default ColumnSource<?>[] keyColumns(@NotNull final ColumnSource<?>[] indexedCol
@FinalDefault
@NotNull
default ColumnSource<RowSet> rowSetColumn() {
return table().getColumnSource(rowSetColumnName(), RowSet.class);
return rowSetColumn(DataIndexOptions.DEFAULT);
}

/**
* Get the {@link RowSet} {@link ColumnSource} of the index {@link #table() table}.
*
* @param options parameters for controlling how the the table will be built (if necessary) in order to retrieve the
* result {@link RowSet} {@link ColumnSource}
*
* @return The {@link RowSet} {@link ColumnSource}
*/
@FinalDefault
@NotNull
default ColumnSource<RowSet> rowSetColumn(final DataIndexOptions options) {
return table(options).getColumnSource(rowSetColumnName(), RowSet.class);
}

/**
* Get the {@link Table} backing this data index.
*
* <p>
* The returned table is fully in-memory, equivalent to {@link #table(DataIndexOptions)} with default options.
* </p>
*
* @return The {@link Table}
*/
@NotNull
Table table();
default Table table() {
return table(DataIndexOptions.DEFAULT);
}

/**
* Get the {@link Table} backing this data index.
*
* @param options parameters to control the returned table
*
* @return The {@link Table}
*/
@NotNull
Table table(DataIndexOptions options);

/**
* Whether the index {@link #table()} {@link Table#isRefreshing() is refreshing}. Some transformations will force
Expand All @@ -115,4 +145,5 @@ default ColumnSource<RowSet> rowSetColumn() {
* otherwise
*/
boolean isRefreshing();

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,21 @@ interface RowKeyLookup {
* @return A function that provides map-like lookup of index {@link #table()} row keys from an index lookup key
*/
@NotNull
RowKeyLookup rowKeyLookup();
default RowKeyLookup rowKeyLookup() {
return rowKeyLookup(DataIndexOptions.DEFAULT);
}

/**
* Build a {@link RowKeyLookup lookup function} of row keys for this index. If {@link #isRefreshing()} is
* {@code true}, this lookup function is only guaranteed to be accurate for the current cycle. Lookup keys should be
* in the order of the index's key columns.
*
* @param options parameters for building the table, if required by this RowKeyLookup
*
* @return A function that provides map-like lookup of index {@link #table()} row keys from an index lookup key
*/
@NotNull
RowKeyLookup rowKeyLookup(DataIndexOptions options);

/**
* Return a {@link RowKeyLookup lookup function} function of index row keys for this index. If
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.engine.table;

import io.deephaven.annotations.BuildableStyle;
import io.deephaven.api.filter.Filter;
import org.immutables.value.Value;

/**
* Options for controlling the function of a {@link DataIndex}.
*
*/
@Value.Immutable
@BuildableStyle
public interface DataIndexOptions {
/**
* Static default options, which expect that operations will use the full table.
*/
DataIndexOptions DEFAULT = DataIndexOptions.builder().build();

/**
* Static options for operations that use a partial table instead of the full table.
*/
DataIndexOptions USING_PARTIAL_TABLE = DataIndexOptions.builder().operationUsesPartialTable(true).build();

/**
* Does this operation use only a subset of the DataIndex?
*
* <p>
* The DataIndex implementation may use this hint to defer work for some row sets.
* </p>
*
* <p>
* Presently, this is used for the {@link Table#where(Filter)} operation to hint that work for computing
* {@link io.deephaven.engine.rowset.RowSet RowSets} for non-matching keys should be deferred.
* </p>
*
* @return if this operation is only going to use a subset of this data index
*/
@Value.Default
default boolean operationUsesPartialTable() {
return false;
}

/**
* Create a new builder for a {@link DataIndexOptions}.
*
* @return
*/
static Builder builder() {
return ImmutableDataIndexOptions.builder();
}

/**
* The builder interface to construct a {@link DataIndexOptions}.
*/
interface Builder {
/**
* Set whether this operation only uses a subset of the data index.
*
* @param usesPartialTable true if this operation only uses a partial table
* @return this builder
*/
Builder operationUsesPartialTable(boolean usesPartialTable);

/**
* Build the {@link DataIndexOptions}.
*
* @return an immutable DataIndexOptions structure.
*/
DataIndexOptions build();
}
}
Loading

0 comments on commit 5841eec

Please sign in to comment.