Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve empty and key initializer table support for aggregations #2719

Merged
merged 26 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
30521ea
Add preserveEmpty and initialGroups to the TableOperations interface …
rcaudy Jun 13, 2022
99a58ee
Plumb preserveEmpty and initialGroups into ChunkedOperatorAggregation…
rcaudy Jun 26, 2022
c1fe2bf
Delete legacy hash tables for aggregation
rcaudy Aug 4, 2022
d56795c
Delete legacy hash table replication code for updateBy
rcaudy Aug 4, 2022
7b6cf7e
Remove all use of now-deleted legacy aggregation hash tables. Remove …
rcaudy Aug 5, 2022
95da8cf
WIP on grouping support for key initialization tables
rcaudy Aug 6, 2022
d351cd0
Finish hashing and adding for key initialization tables
rcaudy Aug 6, 2022
880d7cd
Handle initial row set population for key initializers + preserve empty
rcaudy Aug 7, 2022
86bb821
Spotless
rcaudy Aug 7, 2022
65e8f19
Add convenience partitionedAggBy
rcaudy Aug 7, 2022
07d812f
Spotless again
rcaudy Aug 7, 2022
eaa8446
Fix count operator bug for selectDistinct
rcaudy Aug 7, 2022
f56dbc6
Add key column validation to ChunkedOperatorAggregationHelper
rcaudy Aug 7, 2022
f88c7e1
Begin resurrecting unit tests that used TableMap.populateKeys
rcaudy Aug 7, 2022
e92bc41
Finish resurrecting old populateKeys unit tests
rcaudy Aug 7, 2022
7662002
Make it so ephemeral column source groups are actually cacheable unti…
rcaudy Aug 8, 2022
0ddaeec
Allow empty groups to be added to operators in ChunkedOperatorAggrega…
rcaudy Aug 8, 2022
2e2fc1e
Add unit tests for preserveEmpty and initialGroups
rcaudy Aug 8, 2022
ee5cdff
Out damn spot
rcaudy Aug 8, 2022
8474bc2
Eliminate unused rowCountSource from IncrementalChunkedOperatorAggreg…
rcaudy Aug 8, 2022
1d5bbec
Fix memoization key for aggBy to be aware of new parameters
rcaudy Aug 8, 2022
0c5e3ae
Unit test for preserve empty with no-key aggs
rcaudy Aug 9, 2022
d9a37ac
Refactor StateChangeRecorder into a superclass of CountAggregationOpe…
rcaudy Aug 9, 2022
2957c75
Extend StateChangeRecorder in a number of operators that require row …
rcaudy Aug 9, 2022
c8aebaa
Spotless
rcaudy Aug 9, 2022
fb9d81f
Some clean up to StateChangeRecorder functionality, and support Formu…
rcaudy Aug 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,6 @@ default void releaseCachedResources() {
<ALTERNATE_DATA_TYPE> ColumnSource<ALTERNATE_DATA_TYPE> reinterpret(
@NotNull final Class<ALTERNATE_DATA_TYPE> alternateDataType) throws IllegalArgumentException;

@Override
default List<ColumnSource> getColumnSources() {
return Collections.singletonList(this);
}

@Override
default T createTuple(final long rowKey) {
return get(rowKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.engine.liveness.LivenessNode;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.updategraph.ConcurrentMethod;
import io.deephaven.util.annotations.FinalDefault;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -62,6 +63,7 @@ interface Proxy extends TableOperations<Proxy, TableOperations<?, ?>> {
*
* @return The underlying {@link Table partitioned table}
*/
@ConcurrentMethod
Table table();

/**
Expand All @@ -70,6 +72,7 @@ interface Proxy extends TableOperations<Proxy, TableOperations<?, ?>> {
*
* @return The key column names
*/
@ConcurrentMethod
Set<String> keyColumnNames();

/**
Expand All @@ -82,13 +85,15 @@ interface Proxy extends TableOperations<Proxy, TableOperations<?, ?>> {
*
* @return Whether the keys in the underlying partitioned table are unique
*/
@ConcurrentMethod
boolean uniqueKeys();

/**
* Get the name of the "constituent" column of {@link Table tables}.
*
* @return The constituent column name
*/
@ConcurrentMethod
String constituentColumnName();

/**
Expand All @@ -98,6 +103,7 @@ interface Proxy extends TableOperations<Proxy, TableOperations<?, ?>> {
*
* @return The constituent definition
*/
@ConcurrentMethod
TableDefinition constituentDefinition();

/**
Expand All @@ -115,6 +121,7 @@ interface Proxy extends TableOperations<Proxy, TableOperations<?, ?>> {
*
* @return Whether the constituents of the underlying partitioned table can change
*/
@ConcurrentMethod
boolean constituentChangesPermitted();

/**
Expand All @@ -125,6 +132,7 @@ interface Proxy extends TableOperations<Proxy, TableOperations<?, ?>> {
* @see #proxy(boolean, boolean)
*/
@FinalDefault
@ConcurrentMethod
default Proxy proxy() {
return proxy(true, true);
}
Expand All @@ -147,6 +155,7 @@ default Proxy proxy() {
* @return A proxy that allows {@link TableOperations table operations} to be applied to the constituent tables of
* this PartitionedTable
*/
@ConcurrentMethod
Proxy proxy(boolean requireMatchingKeys, boolean sanityCheckJoinOperations);

/**
Expand All @@ -169,6 +178,7 @@ default Proxy proxy() {
* @param filters The filters to apply. Must not reference the constituent column.
* @return The filtered PartitionedTable
*/
@ConcurrentMethod
PartitionedTable filter(Collection<? extends Filter> filters);

/**
Expand All @@ -180,6 +190,7 @@ default Proxy proxy() {
* @param sortColumns The columns to sort by. Must not reference the constituent column.
* @return The sorted PartitionedTable
*/
@ConcurrentMethod
PartitionedTable sort(Collection<SortColumn> sortColumns);

/**
Expand Down Expand Up @@ -242,6 +253,7 @@ PartitionedTable partitionedTransform(
* @return The {@link Table constituent} at the single row in {@link #table()} matching the {@code keyColumnValues},
* or {@code null} if no matches were found
*/
@ConcurrentMethod
Table constituentFor(@NotNull Object... keyColumnValues);

/**
Expand All @@ -255,5 +267,6 @@ PartitionedTable partitionedTransform(
*
* @return An array of all current {@link Table constituents}
*/
@ConcurrentMethod
Table[] constituents();
}
42 changes: 38 additions & 4 deletions engine/api/src/main/java/io/deephaven/engine/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ public interface Table extends
* all rows:
* <ol>
* <li>{@link #groupBy} is unsupported
* <li>{@link #aggBy} is unsupported if {@link Aggregation#AggGroup(String...)} is used
* <li>{@link #partitionBy} is unsupported</li>
* <li>{@link #partitionedAggBy(Collection, boolean, Table, String...) partitionedAggBy} is unsupported</li>
* <li>{@link #aggBy} is unsupported if either of {@link io.deephaven.api.agg.spec.AggSpecGroup group} or
* {@link io.deephaven.api.agg.Partition partition} are used</li>
* <li>{@link #rollup(Collection, boolean, ColumnName...) rollup()} is unsupported if
* {@code includeConstituents == true}</li>
* <li>{@link #treeTable(String, String) treeTable()} is unsupported</li>
Expand Down Expand Up @@ -995,6 +997,14 @@ Table join(Table rightTable, Collection<? extends JoinMatch> columnsToMatch,
@ConcurrentMethod
Table aggBy(Aggregation aggregation);

@Override
@ConcurrentMethod
Table aggBy(Collection<? extends Aggregation> aggregations);

@Override
@ConcurrentMethod
Table aggBy(Collection<? extends Aggregation> aggregations, boolean preserveEmpty);

@Override
@ConcurrentMethod
Table aggBy(Aggregation aggregation, String... groupByColumns);
Expand All @@ -1005,15 +1015,16 @@ Table join(Table rightTable, Collection<? extends JoinMatch> columnsToMatch,

@Override
@ConcurrentMethod
Table aggBy(Collection<? extends Aggregation> aggregations, Collection<? extends ColumnName> groupByColumns);
Table aggBy(Collection<? extends Aggregation> aggregations, String... groupByColumns);

@Override
@ConcurrentMethod
Table aggBy(Collection<? extends Aggregation> aggregations, String... groupByColumns);
Table aggBy(Collection<? extends Aggregation> aggregations, Collection<? extends ColumnName> groupByColumns);

@Override
@ConcurrentMethod
Table aggBy(Collection<? extends Aggregation> aggregations);
Table aggBy(Collection<? extends Aggregation> aggregations, boolean preserveEmpty, Table initialGroups,
Collection<? extends ColumnName> groupByColumns);

Table headBy(long nRows, Collection<String> groupByColumnNames);

Expand Down Expand Up @@ -1602,6 +1613,29 @@ Table join(Table rightTable, Collection<? extends JoinMatch> columnsToMatch,
@ConcurrentMethod
PartitionedTable partitionBy(String... keyColumnNames);

/**
* Convenience method that performs an {@link #aggBy(Collection, boolean, Table, Collection)} and wraps the result
* in a {@link PartitionedTable}. If {@code aggregations} does not include a {@link io.deephaven.api.agg.Partition
* partition}, one will be added automatically with the default constituent column name and behavior used in
* {@link #partitionBy(String...)}.
*
* @param aggregations The {@link Aggregation aggregations} to apply
* @param preserveEmpty Whether to keep result rows for groups that are initially empty or become empty as a result
* of updates. Each aggregation operator defines its own value for empty groups.
* @param initialGroups A table whose distinct combinations of values for the {@code groupByColumns} should be used
* to create an initial set of aggregation groups. All other columns are ignored. This is useful in
* combination with {@code preserveEmpty == true} to ensure that particular groups appear in the result
* table, or with {@code preserveEmpty == false} to control the encounter order for a collection of groups
* and thus their relative order in the result. Changes to {@code initialGroups} are not expected or handled;
* if {@code initialGroups} is a refreshing table, only its contents at instantiation time will be used. If
* {@code initialGroups == null}, the result will be the same as if a table with no rows was supplied.
* @param keyColumnNames The names of the key columns to aggregate by
* @return A {@link PartitionedTable} keyed by {@code keyColumnNames}
*/
@ConcurrentMethod
PartitionedTable partitionedAggBy(Collection<? extends Aggregation> aggregations, boolean preserveEmpty,
Table initialGroups, String... keyColumnNames);

// -----------------------------------------------------------------------------------------------------------------
// Hierarchical table operations (rollup and treeTable).
// -----------------------------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public abstract class AbstractColumnSource<T> implements
Expand All @@ -42,6 +43,7 @@ public abstract class AbstractColumnSource<T> implements
protected final Class<?> componentType;

protected volatile Map<T, RowSet> groupToRange;
protected volatile List<ColumnSource> rowSetIndexerKey;

protected AbstractColumnSource(@NotNull final Class<T> type) {
this(type, Object.class);
Expand Down Expand Up @@ -102,6 +104,19 @@ public ColumnSource<T> getPrevSource() {
return new PrevColumnSource<>(this);
}

@Override
public List<ColumnSource> getColumnSources() {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
List<ColumnSource> localRowSetIndexerKey;
if ((localRowSetIndexerKey = rowSetIndexerKey) == null) {
synchronized (this) {
if ((localRowSetIndexerKey = rowSetIndexerKey) == null) {
rowSetIndexerKey = localRowSetIndexerKey = Collections.singletonList(this);
}
}
}
return localRowSetIndexerKey;
}

@Override
public Map<T, RowSet> getGroupToRange() {
return groupToRange;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.WritableColumnSource;
import io.deephaven.engine.table.impl.indexer.RowSetIndexer;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.engine.table.impl.sources.InMemoryColumnSource;
import io.deephaven.engine.table.impl.sources.ObjectArraySource;
import org.apache.commons.lang3.mutable.MutableInt;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -158,4 +160,25 @@ public static <TYPE> Pair<ArrayBackedColumnSource<TYPE>, ObjectArraySource<Track

return new Pair<>(resultKeyColumnSource, resultIndexColumnSource);
}

/**
* Convert a group-to-RowSet map to a flat, immutable, in-memory column of keys.
*
* @param originalKeyColumnSource The key column source whose contents are reflected by the group-to-RowSet map
* (used for typing, only)
* @param groupToRowSet The group-to-RowSet map to convert
* @return A flat, immutable, in-memory column of keys
*/
public static <TYPE> WritableColumnSource<TYPE> groupingKeysToImmutableFlatSource(
@NotNull final ColumnSource<TYPE> originalKeyColumnSource,
@NotNull final Map<TYPE, RowSet> groupToRowSet) {
final WritableColumnSource<TYPE> destination = InMemoryColumnSource.makeImmutableSource(
originalKeyColumnSource.getType(), originalKeyColumnSource.getComponentType());
destination.ensureCapacity(groupToRowSet.size());
int ri = 0;
for (final TYPE key : groupToRowSet.keySet()) {
destination.set(ri++, key);
}
return destination;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,6 @@ public Table join(Table rightTable, MatchPair[] columnsToMatch, MatchPair[] colu
return throwUnsupported("join()");
}

@Override
public Table countBy(String countColumnName, ColumnName... groupByColumns) {
return throwUnsupported("countBy()");
}

@Override
public Table ungroup(boolean nullFill, String... columnsToUngroup) {
return throwUnsupported("ungroup()");
Expand All @@ -186,7 +181,7 @@ public Table aggAllBy(AggSpec spec, ColumnName... groupByColumns) {
}

@Override
public Table aggBy(Collection<? extends Aggregation> aggregations,
public Table aggBy(Collection<? extends Aggregation> aggregations, boolean preserveEmpty, Table initialGroups,
Collection<? extends ColumnName> groupByColumns) {
return throwUnsupported("aggBy()");
}
Expand Down Expand Up @@ -251,6 +246,12 @@ public PartitionedTable partitionBy(boolean dropKeys, String... keyColumnNames)
return throwUnsupported("partitionBy()");
}

@Override
public PartitionedTable partitionedAggBy(Collection<? extends Aggregation> aggregations, boolean preserveEmpty,
Table initialGroups, String... keyColumnNames) {
return throwUnsupported("partitionedAggBy()");
}

@Override
public Table rollup(Collection<? extends Aggregation> aggregations, boolean includeConstituents,
ColumnName... groupByColumns) {
Expand Down
Loading