Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make DataIndex accumulation and transformation parallelizable #5457

Merged
merged 4 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.select.FunctionalColumn;
import io.deephaven.engine.table.impl.select.FunctionalColumnLong;
import io.deephaven.engine.table.impl.select.SelectColumn;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -153,10 +154,10 @@ private Table maybeIntersectAndInvert(@NotNull final Table indexTable) {
final Function<RowSet, RowSet> mutator =
getMutator(transformer.intersectRowSet().orElse(null), transformer.invertRowSet().orElse(null));
final Table mutated = indexTable
.update(List.of(new FunctionalColumn<>(
.update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>(
parentIndex.rowSetColumnName(), RowSet.class,
parentIndex.rowSetColumnName(), RowSet.class,
mutator)));
mutator))));
if (transformer.intersectRowSet().isPresent()) {
return mutated.where(Filter.isNotNull(ColumnName.of(parentIndex.rowSetColumnName())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ static Collection<SelectColumn> copyFrom(Collection<SelectColumn> selectColumns)
return selectColumns.stream().map(SelectColumn::copy).collect(Collectors.toList());
}

/**
* Produce a {@link #isStateless() stateless} SelectColumn from {@code selectable}.
*
* @param selectable The {@link Selectable} to adapt and mark as stateless
* @return The resulting SelectColumn
*/
static SelectColumn ofStateless(@NotNull final Selectable selectable) {
return new StatelessSelectColumn(of(selectable));
}

/**
* Convenient static final instance of a zero length Array of SelectColumns for use in toArray calls.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.engine.table.impl.select;

import io.deephaven.api.ColumnName;
import io.deephaven.api.expression.Expression;
import io.deephaven.engine.rowset.TrackingRowSet;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.WritableColumnSource;
import io.deephaven.engine.table.impl.BaseTable;
import io.deephaven.engine.table.impl.MatchPair;
import io.deephaven.engine.table.impl.QueryCompilerRequestProcessor;
import org.jetbrains.annotations.NotNull;

import java.util.List;
import java.util.Map;

/**
* {@link SelectColumn} implementation that wraps another {@link SelectColumn} and makes it report to be
* {@link #isStateless() stateless}.
*/
class StatelessSelectColumn implements SelectColumn {

private final SelectColumn inner;

StatelessSelectColumn(@NotNull final SelectColumn inner) {
this.inner = inner;
}

@Override
public List<String> initInputs(TrackingRowSet rowSet, Map<String, ? extends ColumnSource<?>> columnsOfInterest) {
return inner.initInputs(rowSet, columnsOfInterest);
}

@Override
public List<String> initDef(@NotNull Map<String, ColumnDefinition<?>> columnDefinitionMap) {
return inner.initDef(columnDefinitionMap);
}

@Override
public List<String> initDef(@NotNull Map<String, ColumnDefinition<?>> columnDefinitionMap,
@NotNull QueryCompilerRequestProcessor compilationRequestProcessor) {
return inner.initDef(columnDefinitionMap, compilationRequestProcessor);
}

@Override
public Class<?> getReturnedType() {
return inner.getReturnedType();
}

@Override
public Class<?> getReturnedComponentType() {
return inner.getReturnedComponentType();
}

@Override
public List<String> getColumns() {
return inner.getColumns();
}

@Override
public List<String> getColumnArrays() {
return inner.getColumnArrays();
}

@Override
public @NotNull ColumnSource<?> getDataView() {
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
return inner.getDataView();
}

@Override
public @NotNull ColumnSource<?> getLazyView() {
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
return inner.getLazyView();
}

@Override
public String getName() {
return inner.getName();
}

@Override
public MatchPair getMatchPair() {
return inner.getMatchPair();
}

@Override
public WritableColumnSource<?> newDestInstance(long size) {
return inner.newDestInstance(size);
}

@Override
public WritableColumnSource<?> newFlatDestInstance(long size) {
return inner.newFlatDestInstance(size);
}

@Override
public boolean isRetain() {
return inner.isRetain();
}

@Override
public void validateSafeForRefresh(BaseTable<?> sourceTable) {
inner.validateSafeForRefresh(sourceTable);
}

@Override
public boolean isStateless() {
return true;
}

@Override
public SelectColumn copy() {
return new StatelessSelectColumn(inner.copy());
}

@Override
public ColumnName newColumn() {
return inner.newColumn();
}

@Override
public Expression expression() {
return inner.expression();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.deephaven.engine.table.impl.locations.TableLocation;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.table.impl.select.FunctionalColumn;
import io.deephaven.engine.table.impl.select.SelectColumn;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.InternalUseOnly;
import io.deephaven.vector.ObjectVector;
Expand Down Expand Up @@ -138,11 +139,11 @@ private Table buildTable() {
// pages during the accumulation phase.
final String[] keyColumnNamesArray = keyColumnNames.toArray(String[]::new);
final Table locationDataIndexes = locationTable
.update(List.of(new FunctionalColumn<>(
.update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>(
columnSourceManager.locationColumnName(), TableLocation.class,
LOCATION_DATA_INDEX_TABLE_COLUMN_NAME, Table.class,
(final long locationRowKey, final TableLocation location) -> loadIndexTableAndShiftRowSets(
locationRowKey, location, keyColumnNamesArray))))
locationRowKey, location, keyColumnNamesArray)))))
.dropColumns(columnSourceManager.locationColumnName());

// Merge all the location index tables into a single table
Expand All @@ -153,10 +154,10 @@ private Table buildTable() {

// Combine the row sets from each group into a single row set
final Table combined = groupedByKeyColumns
.update(List.of(new FunctionalColumn<>(
.update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>(
ROW_SET_COLUMN_NAME, ObjectVector.class,
ROW_SET_COLUMN_NAME, RowSet.class,
this::mergeRowSets)));
this::mergeRowSets))));
Assert.assertion(combined.isFlat(), "combined.isFlat()");
Assert.eq(groupedByKeyColumns.size(), "groupedByKeyColumns.size()", combined.size(), "combined.size()");

Expand All @@ -180,11 +181,11 @@ private static Table loadIndexTableAndShiftRowSets(
String.join(", ", keyColumnNames), location));
}
final Table indexTable = dataIndex.table();
return indexTable.coalesce().update(List.of(new FunctionalColumn<>(
return indexTable.coalesce().update(List.of(SelectColumn.ofStateless(new FunctionalColumn<>(
dataIndex.rowSetColumnName(), RowSet.class,
ROW_SET_COLUMN_NAME, RowSet.class,
(final RowSet rowSet) -> rowSet
.shift(RegionedColumnSource.getFirstRowKey(Math.toIntExact(locationRowKey))))));
.shift(RegionedColumnSource.getFirstRowKey(Math.toIntExact(locationRowKey)))))));
}

private RowSet mergeRowSets(
Expand Down
Loading