Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: DH-18300: Improve DataIndex performance. (#6585) #6593

Merged
merged 1 commit into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions Base/src/main/java/io/deephaven/base/AtomicUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

public abstract class AtomicUtil {

Expand Down Expand Up @@ -267,4 +268,44 @@ public static int atomicAndNot(AtomicInteger i, int mask) {
} while (!i.compareAndSet(expect, update));
return update;
}

/**
* Sets the field to the minimum of the current value and the passed in value
*
* @param o the object to update
* @param fu the field updater
* @param value the value that is a candidate for the minumum
* @return true if the minimum was set
* @param <T> the type of o
*/
public static <T> boolean setMin(final T o, final AtomicLongFieldUpdater<T> fu, final long value) {
long current = fu.get(o);
while (current > value) {
if (fu.compareAndSet(o, current, value)) {
return true;
}
current = fu.get(o);
}
return false;
}

/**
* Sets the field to the maximum of the current value and the passed in value
*
* @param o the object to update
* @param fu the field updater
* @param value the value that is a candidate for the maximum
* @return true if the maximum was set
* @param <T> the type of o
*/
public static <T> boolean setMax(final T o, final AtomicLongFieldUpdater<T> fu, final long value) {
long current = fu.get(o);
while (value > current) {
if (fu.compareAndSet(o, current, value)) {
return true;
}
current = fu.get(o);
}
return false;
}
}
30 changes: 30 additions & 0 deletions Base/src/main/java/io/deephaven/base/stats/ThreadSafeCounter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.base.stats;

import java.util.function.LongFunction;

// --------------------------------------------------------------------

/**
* A statistic where each value represents an additive quantity, and thus the sum of the values <U>does</U> have
* meaning. Examples include event counts and processing duration. If the sum of the values <I>does not</I> have a
* useful interpretation, use {@link State} instead.
* <UL>
* <LI>{@link #increment} updates the counter, recording a single value. This is the most common usage. ({@link #sample}
* does exactly the same thing but is a poor verb to use with a Counter.)
* </UL>
*/
public class ThreadSafeCounter extends ThreadSafeValue {

public ThreadSafeCounter(final long now) {
super(now);
}

public char getTypeTag() {
return Counter.TYPE_TAG;
}

public static final LongFunction<ThreadSafeCounter> FACTORY = ThreadSafeCounter::new;
}
37 changes: 37 additions & 0 deletions Base/src/main/java/io/deephaven/base/stats/ThreadSafeValue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.base.stats;

import io.deephaven.base.AtomicUtil;

import java.util.concurrent.atomic.AtomicLongFieldUpdater;

/**
* A thread-safe extension of the {@link Value} class.
*
* <p>
* The {@link #sample(long)} method is synchronized, so may introduce contention compared to the unsafe Value version of
* sample.
* </p>
*/
public abstract class ThreadSafeValue extends Value {

public ThreadSafeValue(long now) {
super(now);
}

protected ThreadSafeValue(History history) {
super(history);
}

@Override
public synchronized void sample(final long x) {
super.sample(x);
}

@Override
public synchronized String toString() {
return super.toString();
}
}
18 changes: 16 additions & 2 deletions Base/src/main/java/io/deephaven/base/stats/Value.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
package io.deephaven.base.stats;

public abstract class Value {

/**
* The sample(long) method is not thread safe; and you can get wrong answers out of it. If you require safety, you
* should instead use a ThreadSafeValue.
*/
protected long n = 0;
protected long last = 0;
protected long sum = 0;
Expand Down Expand Up @@ -51,7 +54,7 @@ protected Value(History history) {
this.history = history;
}

public void sample(long x) {
public void sample(final long x) {
n++;
last = x;
sum += x;
Expand Down Expand Up @@ -99,4 +102,15 @@ public void update(Item item, ItemUpdateListener listener, long logInterval, lon
}
}
}

@Override
public String toString() {
if (n > 0) {
final double std = Math.sqrt(n > 1 ? (sum2 - ((double) sum * sum / (double) n)) / (n - 1) : Double.NaN);
final double avg = (double) sum / n;
return String.format("Value{n=%,d, sum=%,d, max=%,d, min=%,d, avg=%,.3f, std=%,.3f}", n, sum, max, min, avg,
std);
}
return String.format("Value{n=%,d}", n);
}
}
39 changes: 39 additions & 0 deletions Base/src/test/java/io/deephaven/base/stats/TestValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import junit.framework.TestCase;

import java.util.concurrent.Semaphore;
import java.util.function.LongFunction;

// --------------------------------------------------------------------
Expand Down Expand Up @@ -49,6 +50,44 @@ public void testCounter() {
checkValue(Counter.FACTORY);
}

public void testToString() {
// this is purposefully a heisentest, this should make it fail if it is really broken
for (int ii = 0; ii < 10; ++ii) {
doTestToString();
}
}

public void doTestToString() {
// we are testing toString, but also creating a pile of threads to exercise some of the AtomicFieldUpdater
// behavior of the value
final Value counter = ThreadSafeCounter.FACTORY.apply(0L);

final String emptyString = counter.toString();
assertEquals("Value{n=0}", emptyString);

final Semaphore semaphore = new Semaphore(0);
final Semaphore completion = new Semaphore(0);
final int n = 100;
for (int ii = 0; ii < n; ++ii) {
final int fii = ii;
new Thread(() -> {
semaphore.acquireUninterruptibly();
counter.sample(fii);
completion.release();
}).start();
}
semaphore.release(100);
completion.acquireUninterruptibly(100);

assertEquals(n, counter.getN());
assertEquals(((n - 1) * n) / 2, counter.getSum());
assertEquals(0, counter.getMin());
assertEquals(99, counter.getMax());

final String asString = counter.toString();
assertEquals("Value{n=100, sum=4,950, max=99, min=0, avg=49.500, std=29.011}", asString);
}

// ----------------------------------------------------------------
private void checkValue(LongFunction<? extends Value> factory) {
Value value = factory.apply(1000L);
Expand Down
5 changes: 5 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ For more information, see:
* [gh pr create](https://cli.github.com/manual/gh_pr_create)
* [CLI In Use](https://cli.github.com/manual/examples.html)

## Labels

- Each pull request must have a `ReleaseNotesNeeded` label if a user can perceive the changes. Build or testing-only changes should have a `NoReleaseNotesNeeded` label.
- Each pull request must have a `DocumentationNeeded` label if the user guide should be updated. Pull requests that do not require a user guide update should have a `NoDocumentationNeeded` label.

## Styleguide
The [styleguide](style/README.md) is applied globally to the entire project, except for generated code that gets checked in.
To apply the styleguide, run `./gradlew spotlessApply`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,46 @@ default ColumnSource<?>[] keyColumns(@NotNull final ColumnSource<?>[] indexedCol
@FinalDefault
@NotNull
default ColumnSource<RowSet> rowSetColumn() {
return table().getColumnSource(rowSetColumnName(), RowSet.class);
return rowSetColumn(DataIndexOptions.DEFAULT);
}

/**
* Get the {@link RowSet} {@link ColumnSource} of the index {@link #table() table}.
*
* @param options parameters for controlling how the the table will be built (if necessary) in order to retrieve the
* result {@link RowSet} {@link ColumnSource}
*
* @return The {@link RowSet} {@link ColumnSource}
*/
@FinalDefault
@NotNull
default ColumnSource<RowSet> rowSetColumn(final DataIndexOptions options) {
return table(options).getColumnSource(rowSetColumnName(), RowSet.class);
}

/**
* Get the {@link Table} backing this data index.
*
* <p>
* The returned table is fully in-memory, equivalent to {@link #table(DataIndexOptions)} with default options.
* </p>
*
* @return The {@link Table}
*/
@NotNull
Table table();
default Table table() {
return table(DataIndexOptions.DEFAULT);
}

/**
* Get the {@link Table} backing this data index.
*
* @param options parameters to control the returned table
*
* @return The {@link Table}
*/
@NotNull
Table table(DataIndexOptions options);

/**
* Whether the index {@link #table()} {@link Table#isRefreshing() is refreshing}. Some transformations will force
Expand All @@ -115,4 +145,5 @@ default ColumnSource<RowSet> rowSetColumn() {
* otherwise
*/
boolean isRefreshing();

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,21 @@ interface RowKeyLookup {
* @return A function that provides map-like lookup of index {@link #table()} row keys from an index lookup key
*/
@NotNull
RowKeyLookup rowKeyLookup();
default RowKeyLookup rowKeyLookup() {
return rowKeyLookup(DataIndexOptions.DEFAULT);
}

/**
* Build a {@link RowKeyLookup lookup function} of row keys for this index. If {@link #isRefreshing()} is
* {@code true}, this lookup function is only guaranteed to be accurate for the current cycle. Lookup keys should be
* in the order of the index's key columns.
*
* @param options parameters for building the table, if required by this RowKeyLookup
*
* @return A function that provides map-like lookup of index {@link #table()} row keys from an index lookup key
*/
@NotNull
RowKeyLookup rowKeyLookup(DataIndexOptions options);

/**
* Return a {@link RowKeyLookup lookup function} function of index row keys for this index. If
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.engine.table;

import io.deephaven.annotations.BuildableStyle;
import io.deephaven.api.filter.Filter;
import org.immutables.value.Value;

/**
* Options for controlling the function of a {@link DataIndex}.
*
*/
@Value.Immutable
@BuildableStyle
public interface DataIndexOptions {
/**
* Static default options, which expect that operations will use the full table.
*/
DataIndexOptions DEFAULT = DataIndexOptions.builder().build();

/**
* Static options for operations that use a partial table instead of the full table.
*/
DataIndexOptions USING_PARTIAL_TABLE = DataIndexOptions.builder().operationUsesPartialTable(true).build();

/**
* Does this operation use only a subset of the DataIndex?
*
* <p>
* The DataIndex implementation may use this hint to defer work for some row sets.
* </p>
*
* <p>
* Presently, this is used for the {@link Table#where(Filter)} operation to hint that work for computing
* {@link io.deephaven.engine.rowset.RowSet RowSets} for non-matching keys should be deferred.
* </p>
*
* @return if this operation is only going to use a subset of this data index
*/
@Value.Default
default boolean operationUsesPartialTable() {
return false;
}

/**
* Create a new builder for a {@link DataIndexOptions}.
*
* @return
*/
static Builder builder() {
return ImmutableDataIndexOptions.builder();
}

/**
* The builder interface to construct a {@link DataIndexOptions}.
*/
interface Builder {
/**
* Set whether this operation only uses a subset of the data index.
*
* @param usesPartialTable true if this operation only uses a partial table
* @return this builder
*/
Builder operationUsesPartialTable(boolean usesPartialTable);

/**
* Build the {@link DataIndexOptions}.
*
* @return an immutable DataIndexOptions structure.
*/
DataIndexOptions build();
}
}
Loading
Loading