Skip to content

Commit

Permalink
Force PartitionAwareSourceTable to coalesce whenever a partition filt…
Browse files Browse the repository at this point in the history
…er is applied via where (#5485)

* Always coalesce PartitionAwareSourceTable whenever a partitioning filter is encountered, to restore desired UI behavior

* Rename a misnamed constant

* Delete unused method in BaseArrayBackedInputTable

* DeferredViewTable: Fix filter copy bug in doCoalesce. Fix filter initialization for renames. Clarify delegation/coalescing behavior for TableReference.
  • Loading branch information
rcaudy authored and stanbrub committed May 17, 2024
1 parent 02b11bf commit 59de583
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@

import io.deephaven.api.Selectable;
import io.deephaven.api.filter.Filter;
import io.deephaven.base.reference.SimpleReference;
import io.deephaven.base.verify.Assert;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.liveness.Liveness;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzer;
import io.deephaven.engine.table.impl.select.MatchFilter;
import io.deephaven.util.QueryConstants;
import io.deephaven.engine.liveness.LivenessArtifact;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.table.impl.select.*;
Expand Down Expand Up @@ -49,7 +48,7 @@ public DeferredViewTable(@NotNull final TableDefinition definition,
final QueryCompilerRequestProcessor.BatchProcessor compilationProcessor = QueryCompilerRequestProcessor.batch();
SelectAndViewAnalyzer.initializeSelectColumns(
parentDefinition.getColumnNameMap(), this.deferredViewColumns, compilationProcessor);
this.deferredFilters = deferredFilters == null ? WhereFilter.ZERO_LENGTH_SELECT_FILTER_ARRAY : deferredFilters;
this.deferredFilters = deferredFilters == null ? WhereFilter.ZERO_LENGTH_WHERE_FILTER_ARRAY : deferredFilters;
for (final WhereFilter sf : this.deferredFilters) {
sf.init(parentDefinition, compilationProcessor);
if (sf instanceof LivenessReferent && sf.isRefreshing()) {
Expand Down Expand Up @@ -89,51 +88,49 @@ public Table where(Filter filter) {
}

private Table getResultTableWithWhere(WhereFilter... whereFilters) {
if (getCoalesced() != null) {
return coalesce().where(Filter.and(whereFilters));
{
final Table coalesced = getCoalesced();
if (Liveness.verifyCachedObjectForReuse(coalesced)) {
return coalesced.where(Filter.and(whereFilters));
}
}

final WhereFilter[] allFilters = Stream.concat(Arrays.stream(deferredFilters), Arrays.stream(whereFilters))
.map(WhereFilter::copy).toArray(WhereFilter[]::new);
final WhereFilter[] allFilters = Stream.concat(
Arrays.stream(deferredFilters).map(WhereFilter::copy),
Arrays.stream(whereFilters))
.toArray(WhereFilter[]::new);

TableReference.TableAndRemainingFilters tableAndRemainingFilters;
if (allFilters.length == 0) {
tableAndRemainingFilters = tableReference.getWithWhere();
Table result = tableAndRemainingFilters.table;
Table result = tableReference.get();
result = applyDeferredViews(result);
result = result.where(Filter.and(tableAndRemainingFilters.remainingFilters));
copyAttributes((BaseTable<?>) result, CopyAttributeOperation.Coalesce);
setCoalesced(result);
return result;
}

PreAndPostFilters preAndPostFilters = applyFilterRenamings(allFilters);
tableAndRemainingFilters = tableReference.getWithWhere(preAndPostFilters.preViewFilters);
final PreAndPostFilters preAndPostFilters = applyFilterRenamings(allFilters);
final TableReference.TableAndRemainingFilters tableAndRemainingFilters =
tableReference.getWithWhere(preAndPostFilters.preViewFilters);

Table localResult = tableAndRemainingFilters.table;
if (localResult instanceof DeferredViewTable) {
localResult = ((DeferredViewTable) localResult)
.getResultTableWithWhere(tableAndRemainingFilters.remainingFilters);
} else {
localResult =
localResult.where(Filter.and(WhereFilter.copyFrom(tableAndRemainingFilters.remainingFilters)));
if (tableAndRemainingFilters.remainingFilters.length != 0) {
localResult = localResult.where(Filter.and(tableAndRemainingFilters.remainingFilters));
}

localResult = applyDeferredViews(localResult);
if (preAndPostFilters.postViewFilters.length > 0) {
localResult = localResult.where(Filter.and(preAndPostFilters.postViewFilters));
}

if (whereFilters.length == 0) {
// The result is effectively the same as if we called doCoalesce()
copyAttributes((BaseTable<?>) localResult, CopyAttributeOperation.Coalesce);
setCoalesced(localResult);
}
return localResult;
}

private Table applyDeferredViews(Table result) {
if (result instanceof DeferredViewTable) {
result = result.coalesce();
}
if (deferredDropColumns.length > 0) {
result = result.dropColumns(deferredDropColumns);
}
Expand Down Expand Up @@ -213,35 +210,43 @@ private PreAndPostFilters applyFilterRenamings(WhereFilter[] filters) {
if (myRenames.isEmpty()) {
preViewFilters.add(filter);
} else if (filter instanceof MatchFilter) {
MatchFilter matchFilter = (MatchFilter) filter;
final MatchFilter matchFilter = (MatchFilter) filter;
Assert.assertion(myRenames.size() == 1, "Match Filters should only use one column!");
String newName = myRenames.get(matchFilter.getColumnName());
Assert.neqNull(newName, "newName");
preViewFilters.add(matchFilter.renameFilter(newName));
final MatchFilter newFilter = matchFilter.renameFilter(newName);
newFilter.init(tableReference.getDefinition(), compilationProcessor);
preViewFilters.add(newFilter);
} else if (filter instanceof ConditionFilter) {
ConditionFilter conditionFilter = (ConditionFilter) filter;
preViewFilters.add(conditionFilter.renameFilter(myRenames));
final ConditionFilter conditionFilter = (ConditionFilter) filter;
final ConditionFilter newFilter = conditionFilter.renameFilter(myRenames);
newFilter.init(tableReference.getDefinition(), compilationProcessor);
preViewFilters.add(newFilter);
} else {
postViewFilters.add(filter);
}
}
compilationProcessor.compile();

return new PreAndPostFilters(preViewFilters.toArray(WhereFilter.ZERO_LENGTH_SELECT_FILTER_ARRAY),
postViewFilters.toArray(WhereFilter.ZERO_LENGTH_SELECT_FILTER_ARRAY));
return new PreAndPostFilters(preViewFilters.toArray(WhereFilter.ZERO_LENGTH_WHERE_FILTER_ARRAY),
postViewFilters.toArray(WhereFilter.ZERO_LENGTH_WHERE_FILTER_ARRAY));
}

@Override
protected Table doCoalesce() {
Table result;
if (deferredFilters.length > 0) {
PreAndPostFilters preAndPostFilters = applyFilterRenamings(deferredFilters);

TableReference.TableAndRemainingFilters tarf =
final PreAndPostFilters preAndPostFilters = applyFilterRenamings(WhereFilter.copyFrom(deferredFilters));
final TableReference.TableAndRemainingFilters tarf =
tableReference.getWithWhere(preAndPostFilters.preViewFilters);
result = tarf.table;
result = result.where(Filter.and(tarf.remainingFilters));
result = result.where(Filter.and(preAndPostFilters.postViewFilters));
if (tarf.remainingFilters.length != 0) {
result = result.where(Filter.and(tarf.remainingFilters));
}
result = applyDeferredViews(result);
if (preAndPostFilters.postViewFilters.length > 0) {
result = result.where(Filter.and(preAndPostFilters.postViewFilters));
}
} else {
result = tableReference.get();
result = applyDeferredViews(result);
Expand All @@ -253,8 +258,11 @@ protected Table doCoalesce() {
@Override
public Table selectDistinct(Collection<? extends Selectable> columns) {
/* If the cachedResult table has already been created, we can just use that. */
if (getCoalesced() != null) {
return coalesce().selectDistinct(columns);
{
final Table coalesced = getCoalesced();
if (Liveness.verifyCachedObjectForReuse(coalesced)) {
return coalesced.selectDistinct(columns);
}
}

/* If we have any manual filters, then we must coalesce the table. */
Expand All @@ -277,7 +285,7 @@ public Table selectDistinct(Collection<? extends Selectable> columns) {

@Override
protected DeferredViewTable copy() {
final DeferredViewTable result = new DeferredViewTable(definition, description, new SimpleTableReference(this),
final DeferredViewTable result = new DeferredViewTable(definition, description, new TableReference(this),
null, null, null);
LiveAttributeMap.copyAttributes(this, result, ak -> true);
return result;
Expand All @@ -291,24 +299,27 @@ protected Table redefine(TableDefinition newDefinition) {
newView[cdi] = new SourceColumn(cDefs.get(cdi).getName());
}
return new DeferredViewTable(newDefinition, description + "-redefined",
new SimpleTableReference(this), null, newView, null);
new TableReference(this), null, newView, null);
}

@Override
protected Table redefine(TableDefinition newDefinitionExternal, TableDefinition newDefinitionInternal,
SelectColumn[] viewColumns) {
return new DeferredViewTable(newDefinitionExternal, description + "-redefined",
new SimpleTableReference(this), null, viewColumns, null);
new TableReference(this), null, viewColumns, null);
}

/**
* The table reference hides the table underlying table from us.
*/
public static abstract class TableReference extends LivenessArtifact implements SimpleReference<Table> {
public static class TableReference extends LivenessArtifact {

protected final Table table;

private final boolean isRefreshing;

TableReference(Table t) {
this.table = t;
isRefreshing = t.isRefreshing();
if (isRefreshing) {
manage(t);
Expand All @@ -326,26 +337,23 @@ public final boolean isRefreshing() {

/**
* Returns the table in a form that the user can run queries on it. This may be as simple as returning a
* reference, but for amorphous tables, this means we need to do the work to instantiate it.
* reference, but for uncoalesced tables, this means we need to do the work to instantiate it.
*
* @return the table
*/
public abstract Table get();
public Table get() {
return table.coalesce();
}

/**
* Get the definition, without instantiating the table.
*
* @return the definition of the table
*/

public abstract TableDefinition getDefinition();

/**
* What size should the uninitialized table return.
*
* @return the size
*/
public abstract long getSize();
public TableDefinition getDefinition() {
return table.getDefinition();
}

public static class TableAndRemainingFilters {

Expand All @@ -359,8 +367,8 @@ public TableAndRemainingFilters(Table table, WhereFilter[] remainingFilters) {
}

/**
* Get the table in a form that the user can run queries on it. All of the filters that can be run efficiently
* should be run before instantiating the full table should be run. Other filters are returned in the
* Get the table in a form that the user can run queries on it. All the filters that can be run efficiently
* should be run before coalescing the full table should be run. Other filters are returned in the
* remainingFilters field.
*
* @param whereFilters filters to maybe apply before returning the table
Expand All @@ -371,43 +379,15 @@ protected TableAndRemainingFilters getWithWhere(WhereFilter... whereFilters) {
}

/**
* If possible to execute a selectDistinct without instantiating the full table, then do so. Otherwise return
* If possible to execute a selectDistinct without instantiating the full table, then do so. Otherwise, return
* null.
*
* @param columns the columns to selectDistinct
* @return null if the operation can not be performed on an uninstantiated table, otherwise a new table with the
* @return null if the operation can not be performed on an uncoalesced table, otherwise a new table with the
* distinct values from strColumns.
*/
public Table selectDistinctInternal(Collection<? extends Selectable> columns) {
return null;
}

@Override
public final void clear() {}
}

public static class SimpleTableReference extends TableReference {

private final Table table;

public SimpleTableReference(Table table) {
super(table);
this.table = table;
}

@Override
public long getSize() {
return QueryConstants.NULL_LONG;
}

@Override
public TableDefinition getDefinition() {
return table.getDefinition();
}

@Override
public Table get() {
return table;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,32 +101,32 @@ private static Map<String, ColumnDefinition<?>> extractPartitioningColumnDefinit
LinkedHashMap::new));
}

private static class PartitionAwareQueryTableReference extends QueryTableReference {
private static class PartitionAwareTableReference extends DeferredViewTable.TableReference {

private PartitionAwareQueryTableReference(PartitionAwareSourceTable table) {
private PartitionAwareTableReference(PartitionAwareSourceTable table) {
super(table);
}

@Override
protected TableAndRemainingFilters getWithWhere(WhereFilter... whereFilters) {
final List<WhereFilter> partitionFilters = new ArrayList<>();
final List<WhereFilter> deferredFilters = new ArrayList<>();
final List<WhereFilter> otherFilters = new ArrayList<>();
for (WhereFilter whereFilter : whereFilters) {
if (!(whereFilter instanceof ReindexingFilter)
&& ((PartitionAwareSourceTable) table).isValidAgainstColumnPartitionTable(
whereFilter.getColumns(), whereFilter.getColumnArrays())) {
partitionFilters.add(whereFilter);
} else {
deferredFilters.add(whereFilter);
otherFilters.add(whereFilter);
}
}

final Table result = partitionFilters.isEmpty()
? table.coalesce()
? table
: table.where(Filter.and(partitionFilters));

return new TableAndRemainingFilters(result,
deferredFilters.toArray(WhereFilter.ZERO_LENGTH_SELECT_FILTER_ARRAY));
return new TableAndRemainingFilters(result.coalesce(),
otherFilters.toArray(WhereFilter.ZERO_LENGTH_WHERE_FILTER_ARRAY));
}

@Override
Expand Down Expand Up @@ -188,7 +188,7 @@ protected final BaseTable<?> redefine(@NotNull final TableDefinition newDefiniti
componentFactory, locationProvider, updateSourceRegistrar, partitioningColumnDefinitions,
partitioningColumnFilters);
return new DeferredViewTable(newDefinition, description + "-retainColumns",
new PartitionAwareQueryTableReference(redefined),
new PartitionAwareTableReference(redefined),
droppedPartitioningColumnDefinitions.stream().map(ColumnDefinition::getName).toArray(String[]::new),
null, null);
}
Expand All @@ -198,8 +198,8 @@ protected final Table redefine(TableDefinition newDefinitionExternal, TableDefin
SelectColumn[] viewColumns) {
BaseTable<?> redefined = redefine(newDefinitionInternal);
DeferredViewTable.TableReference reference = redefined instanceof PartitionAwareSourceTable
? new PartitionAwareQueryTableReference((PartitionAwareSourceTable) redefined)
: new DeferredViewTable.SimpleTableReference(redefined);
? new PartitionAwareTableReference((PartitionAwareSourceTable) redefined)
: new DeferredViewTable.TableReference(redefined);
return new DeferredViewTable(newDefinitionExternal, description + "-redefined",
reference, null, viewColumns, null);
}
Expand Down Expand Up @@ -260,28 +260,33 @@ private Table whereImpl(final WhereFilter[] whereFilters) {

final QueryCompilerRequestProcessor.BatchProcessor compilationProcessor = QueryCompilerRequestProcessor.batch();
final List<WhereFilter> partitionFilters = new ArrayList<>();
final List<WhereFilter> deferredFilters = new ArrayList<>();
final List<WhereFilter> otherFilters = new ArrayList<>();
for (WhereFilter whereFilter : whereFilters) {
whereFilter.init(definition, compilationProcessor);
if (!(whereFilter instanceof ReindexingFilter)
&& isValidAgainstColumnPartitionTable(whereFilter.getColumns(), whereFilter.getColumnArrays())) {
partitionFilters.add(whereFilter);
} else {
deferredFilters.add(whereFilter);
otherFilters.add(whereFilter);
}
}
compilationProcessor.compile();

final PartitionAwareSourceTable withPartitionsFiltered = partitionFilters.isEmpty()
? this
: QueryPerformanceRecorder.withNugget("getFilteredTable(" + partitionFilters + ")",
() -> getFilteredTable(partitionFilters));
// If we have no partition filters, we defer all filters.
if (partitionFilters.isEmpty()) {
return new DeferredViewTable(definition, getDescription() + "-withDeferredFilters",
new PartitionAwareTableReference(this), null, null,
otherFilters.toArray(WhereFilter.ZERO_LENGTH_WHERE_FILTER_ARRAY));
}

return deferredFilters.isEmpty()
? withPartitionsFiltered
: new DeferredViewTable(definition, withPartitionsFiltered.getDescription() + "-withDeferredFilters",
new PartitionAwareQueryTableReference(withPartitionsFiltered), null, null,
deferredFilters.toArray(WhereFilter.ZERO_LENGTH_SELECT_FILTER_ARRAY));
// If we have any partition filters, we first create a new instance that filters the location keys accordingly,
// then coalesce, and then apply the remaining filters to the coalesced result.
final Table withPartitionsFiltered = QueryPerformanceRecorder.withNugget(
"getFilteredTable(" + partitionFilters + ")", () -> getFilteredTable(partitionFilters));
final Table coalesced = withPartitionsFiltered.coalesce();
return otherFilters.isEmpty()
? coalesced
: coalesced.where(Filter.and(otherFilters));
}

@Override
Expand Down
Loading

0 comments on commit 59de583

Please sign in to comment.