Skip to content

Commit

Permalink
Make DataIndex accumulation and transformation parallelizable (#5457)
Browse files Browse the repository at this point in the history
* Create a tool for marking SelectColumns that are known to be stateless as such

* Mark FunctionalColumns used in MergedDataIndex and TransformedDataIndex as stateless in order to enable parallelism
  • Loading branch information
rcaudy authored and stanbrub committed May 17, 2024
1 parent 2d7404f commit b8448a6
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.select.FunctionalColumn;
import io.deephaven.engine.table.impl.select.FunctionalColumnLong;
import io.deephaven.engine.table.impl.select.SelectColumn;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -153,10 +154,10 @@ private Table maybeIntersectAndInvert(@NotNull final Table indexTable) {
final Function<RowSet, RowSet> mutator =
getMutator(transformer.intersectRowSet().orElse(null), transformer.invertRowSet().orElse(null));
final Table mutated = indexTable
.update(List.of(new FunctionalColumn<>(
.update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>(
parentIndex.rowSetColumnName(), RowSet.class,
parentIndex.rowSetColumnName(), RowSet.class,
mutator)));
mutator))));
if (transformer.intersectRowSet().isPresent()) {
return mutated.where(Filter.isNotNull(ColumnName.of(parentIndex.rowSetColumnName())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ static Collection<SelectColumn> copyFrom(Collection<SelectColumn> selectColumns)
return selectColumns.stream().map(SelectColumn::copy).collect(Collectors.toList());
}

/**
* Produce a {@link #isStateless() stateless} SelectColumn from {@code selectable}.
*
* @param selectable The {@link Selectable} to adapt and mark as stateless
* @return The resulting SelectColumn
*/
static SelectColumn ofStateless(@NotNull final Selectable selectable) {
return new StatelessSelectColumn(of(selectable));
}

/**
* Convenient static final instance of a zero length Array of SelectColumns for use in toArray calls.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.engine.table.impl.select;

import io.deephaven.api.ColumnName;
import io.deephaven.api.expression.Expression;
import io.deephaven.engine.rowset.TrackingRowSet;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.WritableColumnSource;
import io.deephaven.engine.table.impl.BaseTable;
import io.deephaven.engine.table.impl.MatchPair;
import io.deephaven.engine.table.impl.QueryCompilerRequestProcessor;
import org.jetbrains.annotations.NotNull;

import java.util.List;
import java.util.Map;

/**
* {@link SelectColumn} implementation that wraps another {@link SelectColumn} and makes it report to be
* {@link #isStateless() stateless}.
*/
class StatelessSelectColumn implements SelectColumn {

private final SelectColumn inner;

StatelessSelectColumn(@NotNull final SelectColumn inner) {
this.inner = inner;
}

@Override
public List<String> initInputs(
@NotNull final TrackingRowSet rowSet,
@NotNull final Map<String, ? extends ColumnSource<?>> columnsOfInterest) {
return inner.initInputs(rowSet, columnsOfInterest);
}

@Override
public List<String> initDef(@NotNull final Map<String, ColumnDefinition<?>> columnDefinitionMap) {
return inner.initDef(columnDefinitionMap);
}

@Override
public List<String> initDef(
@NotNull final Map<String, ColumnDefinition<?>> columnDefinitionMap,
@NotNull final QueryCompilerRequestProcessor compilationRequestProcessor) {
return inner.initDef(columnDefinitionMap, compilationRequestProcessor);
}

@Override
public Class<?> getReturnedType() {
return inner.getReturnedType();
}

@Override
public Class<?> getReturnedComponentType() {
return inner.getReturnedComponentType();
}

@Override
public List<String> getColumns() {
return inner.getColumns();
}

@Override
public List<String> getColumnArrays() {
return inner.getColumnArrays();
}

@Override
@NotNull
public ColumnSource<?> getDataView() {
return inner.getDataView();
}

@Override
@NotNull
public ColumnSource<?> getLazyView() {
return inner.getLazyView();
}

@Override
public String getName() {
return inner.getName();
}

@Override
public MatchPair getMatchPair() {
return inner.getMatchPair();
}

@Override
public WritableColumnSource<?> newDestInstance(final long size) {
return inner.newDestInstance(size);
}

@Override
public WritableColumnSource<?> newFlatDestInstance(final long size) {
return inner.newFlatDestInstance(size);
}

@Override
public boolean isRetain() {
return inner.isRetain();
}

@Override
public void validateSafeForRefresh(@NotNull final BaseTable<?> sourceTable) {
inner.validateSafeForRefresh(sourceTable);
}

@Override
public boolean isStateless() {
return true;
}

@Override
public SelectColumn copy() {
return new StatelessSelectColumn(inner.copy());
}

@Override
public ColumnName newColumn() {
return inner.newColumn();
}

@Override
public Expression expression() {
return inner.expression();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.deephaven.engine.table.impl.locations.TableLocation;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.table.impl.select.FunctionalColumn;
import io.deephaven.engine.table.impl.select.SelectColumn;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.InternalUseOnly;
import io.deephaven.vector.ObjectVector;
Expand Down Expand Up @@ -138,11 +139,11 @@ private Table buildTable() {
// pages during the accumulation phase.
final String[] keyColumnNamesArray = keyColumnNames.toArray(String[]::new);
final Table locationDataIndexes = locationTable
.update(List.of(new FunctionalColumn<>(
.update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>(
columnSourceManager.locationColumnName(), TableLocation.class,
LOCATION_DATA_INDEX_TABLE_COLUMN_NAME, Table.class,
(final long locationRowKey, final TableLocation location) -> loadIndexTableAndShiftRowSets(
locationRowKey, location, keyColumnNamesArray))))
locationRowKey, location, keyColumnNamesArray)))))
.dropColumns(columnSourceManager.locationColumnName());

// Merge all the location index tables into a single table
Expand All @@ -153,10 +154,10 @@ private Table buildTable() {

// Combine the row sets from each group into a single row set
final Table combined = groupedByKeyColumns
.update(List.of(new FunctionalColumn<>(
.update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>(
ROW_SET_COLUMN_NAME, ObjectVector.class,
ROW_SET_COLUMN_NAME, RowSet.class,
this::mergeRowSets)));
this::mergeRowSets))));
Assert.assertion(combined.isFlat(), "combined.isFlat()");
Assert.eq(groupedByKeyColumns.size(), "groupedByKeyColumns.size()", combined.size(), "combined.size()");

Expand All @@ -180,11 +181,11 @@ private static Table loadIndexTableAndShiftRowSets(
String.join(", ", keyColumnNames), location));
}
final Table indexTable = dataIndex.table();
return indexTable.coalesce().update(List.of(new FunctionalColumn<>(
return indexTable.coalesce().update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>(
dataIndex.rowSetColumnName(), RowSet.class,
ROW_SET_COLUMN_NAME, RowSet.class,
(final RowSet rowSet) -> rowSet
.shift(RegionedColumnSource.getFirstRowKey(Math.toIntExact(locationRowKey))))));
.shift(RegionedColumnSource.getFirstRowKey(Math.toIntExact(locationRowKey)))))));
}

private RowSet mergeRowSets(
Expand Down

0 comments on commit b8448a6

Please sign in to comment.