Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force PartitionAwareSourceTable to coalesce whenever a partition filter is applied via where #5485

Merged
merged 5 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@

import io.deephaven.api.Selectable;
import io.deephaven.api.filter.Filter;
import io.deephaven.base.reference.SimpleReference;
import io.deephaven.base.verify.Assert;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.liveness.Liveness;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzer;
import io.deephaven.engine.table.impl.select.MatchFilter;
import io.deephaven.util.QueryConstants;
import io.deephaven.engine.liveness.LivenessArtifact;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.table.impl.select.*;
Expand Down Expand Up @@ -49,7 +48,7 @@ public DeferredViewTable(@NotNull final TableDefinition definition,
final QueryCompilerRequestProcessor.BatchProcessor compilationProcessor = QueryCompilerRequestProcessor.batch();
SelectAndViewAnalyzer.initializeSelectColumns(
parentDefinition.getColumnNameMap(), this.deferredViewColumns, compilationProcessor);
this.deferredFilters = deferredFilters == null ? WhereFilter.ZERO_LENGTH_SELECT_FILTER_ARRAY : deferredFilters;
this.deferredFilters = deferredFilters == null ? WhereFilter.ZERO_LENGTH_WHERE_FILTER_ARRAY : deferredFilters;
for (final WhereFilter sf : this.deferredFilters) {
sf.init(parentDefinition, compilationProcessor);
if (sf instanceof LivenessReferent && sf.isRefreshing()) {
Expand Down Expand Up @@ -89,51 +88,49 @@ public Table where(Filter filter) {
}

private Table getResultTableWithWhere(WhereFilter... whereFilters) {
if (getCoalesced() != null) {
return coalesce().where(Filter.and(whereFilters));
{
final Table coalesced = getCoalesced();
if (Liveness.verifyCachedObjectForReuse(coalesced)) {
return coalesced.where(Filter.and(whereFilters));
}
}

final WhereFilter[] allFilters = Stream.concat(Arrays.stream(deferredFilters), Arrays.stream(whereFilters))
.map(WhereFilter::copy).toArray(WhereFilter[]::new);
final WhereFilter[] allFilters = Stream.concat(
Arrays.stream(deferredFilters).map(WhereFilter::copy),
Arrays.stream(whereFilters))
.toArray(WhereFilter[]::new);

TableReference.TableAndRemainingFilters tableAndRemainingFilters;
if (allFilters.length == 0) {
tableAndRemainingFilters = tableReference.getWithWhere();
Table result = tableAndRemainingFilters.table;
Table result = tableReference.get();
result = applyDeferredViews(result);
result = result.where(Filter.and(tableAndRemainingFilters.remainingFilters));
copyAttributes((BaseTable<?>) result, CopyAttributeOperation.Coalesce);
setCoalesced(result);
return result;
}

PreAndPostFilters preAndPostFilters = applyFilterRenamings(allFilters);
tableAndRemainingFilters = tableReference.getWithWhere(preAndPostFilters.preViewFilters);
final PreAndPostFilters preAndPostFilters = applyFilterRenamings(allFilters);
final TableReference.TableAndRemainingFilters tableAndRemainingFilters =
tableReference.getWithWhere(preAndPostFilters.preViewFilters);

Table localResult = tableAndRemainingFilters.table;
if (localResult instanceof DeferredViewTable) {
localResult = ((DeferredViewTable) localResult)
.getResultTableWithWhere(tableAndRemainingFilters.remainingFilters);
} else {
localResult =
localResult.where(Filter.and(WhereFilter.copyFrom(tableAndRemainingFilters.remainingFilters)));
if (tableAndRemainingFilters.remainingFilters.length != 0) {
localResult = localResult.where(Filter.and(tableAndRemainingFilters.remainingFilters));
cpwright marked this conversation as resolved.
Show resolved Hide resolved
}

localResult = applyDeferredViews(localResult);
if (preAndPostFilters.postViewFilters.length > 0) {
localResult = localResult.where(Filter.and(preAndPostFilters.postViewFilters));
}

if (whereFilters.length == 0) {
// The result is effectively the same as if we called doCoalesce()
copyAttributes((BaseTable<?>) localResult, CopyAttributeOperation.Coalesce);
setCoalesced(localResult);
}
return localResult;
}

private Table applyDeferredViews(Table result) {
if (result instanceof DeferredViewTable) {
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
result = result.coalesce();
}
if (deferredDropColumns.length > 0) {
result = result.dropColumns(deferredDropColumns);
}
Expand Down Expand Up @@ -213,35 +210,43 @@ private PreAndPostFilters applyFilterRenamings(WhereFilter[] filters) {
if (myRenames.isEmpty()) {
preViewFilters.add(filter);
} else if (filter instanceof MatchFilter) {
MatchFilter matchFilter = (MatchFilter) filter;
final MatchFilter matchFilter = (MatchFilter) filter;
Assert.assertion(myRenames.size() == 1, "Match Filters should only use one column!");
String newName = myRenames.get(matchFilter.getColumnName());
Assert.neqNull(newName, "newName");
preViewFilters.add(matchFilter.renameFilter(newName));
final MatchFilter newFilter = matchFilter.renameFilter(newName);
newFilter.init(tableReference.getDefinition(), compilationProcessor);
preViewFilters.add(newFilter);
} else if (filter instanceof ConditionFilter) {
ConditionFilter conditionFilter = (ConditionFilter) filter;
preViewFilters.add(conditionFilter.renameFilter(myRenames));
final ConditionFilter conditionFilter = (ConditionFilter) filter;
final ConditionFilter newFilter = conditionFilter.renameFilter(myRenames);
newFilter.init(tableReference.getDefinition(), compilationProcessor);
preViewFilters.add(newFilter);
} else {
postViewFilters.add(filter);
}
}
compilationProcessor.compile();

return new PreAndPostFilters(preViewFilters.toArray(WhereFilter.ZERO_LENGTH_SELECT_FILTER_ARRAY),
postViewFilters.toArray(WhereFilter.ZERO_LENGTH_SELECT_FILTER_ARRAY));
return new PreAndPostFilters(preViewFilters.toArray(WhereFilter.ZERO_LENGTH_WHERE_FILTER_ARRAY),
postViewFilters.toArray(WhereFilter.ZERO_LENGTH_WHERE_FILTER_ARRAY));
}

@Override
protected Table doCoalesce() {
Table result;
if (deferredFilters.length > 0) {
PreAndPostFilters preAndPostFilters = applyFilterRenamings(deferredFilters);

TableReference.TableAndRemainingFilters tarf =
final PreAndPostFilters preAndPostFilters = applyFilterRenamings(WhereFilter.copyFrom(deferredFilters));
final TableReference.TableAndRemainingFilters tarf =
tableReference.getWithWhere(preAndPostFilters.preViewFilters);
result = tarf.table;
result = result.where(Filter.and(tarf.remainingFilters));
result = result.where(Filter.and(preAndPostFilters.postViewFilters));
if (tarf.remainingFilters.length != 0) {
result = result.where(Filter.and(tarf.remainingFilters));
}
result = applyDeferredViews(result);
if (preAndPostFilters.postViewFilters.length > 0) {
result = result.where(Filter.and(preAndPostFilters.postViewFilters));
}
} else {
result = tableReference.get();
result = applyDeferredViews(result);
Expand All @@ -253,8 +258,11 @@ protected Table doCoalesce() {
@Override
public Table selectDistinct(Collection<? extends Selectable> columns) {
/* If the cachedResult table has already been created, we can just use that. */
if (getCoalesced() != null) {
return coalesce().selectDistinct(columns);
{
final Table coalesced = getCoalesced();
if (Liveness.verifyCachedObjectForReuse(coalesced)) {
return coalesced.selectDistinct(columns);
}
}

/* If we have any manual filters, then we must coalesce the table. */
Expand All @@ -277,7 +285,7 @@ public Table selectDistinct(Collection<? extends Selectable> columns) {

@Override
protected DeferredViewTable copy() {
final DeferredViewTable result = new DeferredViewTable(definition, description, new SimpleTableReference(this),
final DeferredViewTable result = new DeferredViewTable(definition, description, new TableReference(this),
null, null, null);
LiveAttributeMap.copyAttributes(this, result, ak -> true);
return result;
Expand All @@ -291,24 +299,27 @@ protected Table redefine(TableDefinition newDefinition) {
newView[cdi] = new SourceColumn(cDefs.get(cdi).getName());
}
return new DeferredViewTable(newDefinition, description + "-redefined",
new SimpleTableReference(this), null, newView, null);
new TableReference(this), null, newView, null);
}

@Override
protected Table redefine(TableDefinition newDefinitionExternal, TableDefinition newDefinitionInternal,
SelectColumn[] viewColumns) {
return new DeferredViewTable(newDefinitionExternal, description + "-redefined",
new SimpleTableReference(this), null, viewColumns, null);
new TableReference(this), null, viewColumns, null);
}

/**
* The table reference hides the table underlying table from us.
*/
public static abstract class TableReference extends LivenessArtifact implements SimpleReference<Table> {
public static class TableReference extends LivenessArtifact {

protected final Table table;

private final boolean isRefreshing;

TableReference(Table t) {
this.table = t;
isRefreshing = t.isRefreshing();
if (isRefreshing) {
manage(t);
Expand All @@ -326,26 +337,23 @@ public final boolean isRefreshing() {

/**
* Returns the table in a form that the user can run queries on it. This may be as simple as returning a
* reference, but for amorphous tables, this means we need to do the work to instantiate it.
* reference, but for uncoalesced tables, this means we need to do the work to instantiate it.
*
* @return the table
*/
public abstract Table get();
public Table get() {
return table.coalesce();
}

/**
* Get the definition, without instantiating the table.
*
* @return the definition of the table
*/

public abstract TableDefinition getDefinition();

/**
* What size should the uninitialized table return.
*
* @return the size
*/
public abstract long getSize();
public TableDefinition getDefinition() {
return table.getDefinition();
}

public static class TableAndRemainingFilters {

Expand All @@ -359,8 +367,8 @@ public TableAndRemainingFilters(Table table, WhereFilter[] remainingFilters) {
}

/**
* Get the table in a form that the user can run queries on it. All of the filters that can be run efficiently
* should be run before instantiating the full table should be run. Other filters are returned in the
* Get the table in a form that the user can run queries on it. All the filters that can be run efficiently
* should be run before coalescing the full table should be run. Other filters are returned in the
* remainingFilters field.
*
* @param whereFilters filters to maybe apply before returning the table
Expand All @@ -371,43 +379,15 @@ protected TableAndRemainingFilters getWithWhere(WhereFilter... whereFilters) {
}

/**
* If possible to execute a selectDistinct without instantiating the full table, then do so. Otherwise return
* If possible to execute a selectDistinct without instantiating the full table, then do so. Otherwise, return
* null.
*
* @param columns the columns to selectDistinct
* @return null if the operation can not be performed on an uninstantiated table, otherwise a new table with the
* @return null if the operation can not be performed on an uncoalesced table, otherwise a new table with the
* distinct values from strColumns.
*/
public Table selectDistinctInternal(Collection<? extends Selectable> columns) {
return null;
}

@Override
public final void clear() {}
}

public static class SimpleTableReference extends TableReference {

private final Table table;

public SimpleTableReference(Table table) {
super(table);
this.table = table;
}

@Override
public long getSize() {
return QueryConstants.NULL_LONG;
}

@Override
public TableDefinition getDefinition() {
return table.getDefinition();
}

@Override
public Table get() {
return table;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,32 +101,32 @@ private static Map<String, ColumnDefinition<?>> extractPartitioningColumnDefinit
LinkedHashMap::new));
}

private static class PartitionAwareQueryTableReference extends QueryTableReference {
private static class PartitionAwareTableReference extends DeferredViewTable.TableReference {

private PartitionAwareQueryTableReference(PartitionAwareSourceTable table) {
private PartitionAwareTableReference(PartitionAwareSourceTable table) {
super(table);
}

@Override
protected TableAndRemainingFilters getWithWhere(WhereFilter... whereFilters) {
final List<WhereFilter> partitionFilters = new ArrayList<>();
final List<WhereFilter> deferredFilters = new ArrayList<>();
final List<WhereFilter> otherFilters = new ArrayList<>();
for (WhereFilter whereFilter : whereFilters) {
if (!(whereFilter instanceof ReindexingFilter)
&& ((PartitionAwareSourceTable) table).isValidAgainstColumnPartitionTable(
whereFilter.getColumns(), whereFilter.getColumnArrays())) {
partitionFilters.add(whereFilter);
} else {
deferredFilters.add(whereFilter);
otherFilters.add(whereFilter);
}
}

final Table result = partitionFilters.isEmpty()
? table.coalesce()
? table
: table.where(Filter.and(partitionFilters));

return new TableAndRemainingFilters(result,
deferredFilters.toArray(WhereFilter.ZERO_LENGTH_SELECT_FILTER_ARRAY));
return new TableAndRemainingFilters(result.coalesce(),
otherFilters.toArray(WhereFilter.ZERO_LENGTH_WHERE_FILTER_ARRAY));
}

@Override
Expand Down Expand Up @@ -188,7 +188,7 @@ protected final BaseTable<?> redefine(@NotNull final TableDefinition newDefiniti
componentFactory, locationProvider, updateSourceRegistrar, partitioningColumnDefinitions,
partitioningColumnFilters);
return new DeferredViewTable(newDefinition, description + "-retainColumns",
new PartitionAwareQueryTableReference(redefined),
new PartitionAwareTableReference(redefined),
droppedPartitioningColumnDefinitions.stream().map(ColumnDefinition::getName).toArray(String[]::new),
null, null);
}
Expand All @@ -198,8 +198,8 @@ protected final Table redefine(TableDefinition newDefinitionExternal, TableDefin
SelectColumn[] viewColumns) {
BaseTable<?> redefined = redefine(newDefinitionInternal);
DeferredViewTable.TableReference reference = redefined instanceof PartitionAwareSourceTable
? new PartitionAwareQueryTableReference((PartitionAwareSourceTable) redefined)
: new DeferredViewTable.SimpleTableReference(redefined);
? new PartitionAwareTableReference((PartitionAwareSourceTable) redefined)
: new DeferredViewTable.TableReference(redefined);
return new DeferredViewTable(newDefinitionExternal, description + "-redefined",
reference, null, viewColumns, null);
}
Expand Down Expand Up @@ -260,28 +260,33 @@ private Table whereImpl(final WhereFilter[] whereFilters) {

final QueryCompilerRequestProcessor.BatchProcessor compilationProcessor = QueryCompilerRequestProcessor.batch();
final List<WhereFilter> partitionFilters = new ArrayList<>();
final List<WhereFilter> deferredFilters = new ArrayList<>();
final List<WhereFilter> otherFilters = new ArrayList<>();
for (WhereFilter whereFilter : whereFilters) {
whereFilter.init(definition, compilationProcessor);
if (!(whereFilter instanceof ReindexingFilter)
&& isValidAgainstColumnPartitionTable(whereFilter.getColumns(), whereFilter.getColumnArrays())) {
partitionFilters.add(whereFilter);
} else {
deferredFilters.add(whereFilter);
otherFilters.add(whereFilter);
}
}
compilationProcessor.compile();

final PartitionAwareSourceTable withPartitionsFiltered = partitionFilters.isEmpty()
? this
: QueryPerformanceRecorder.withNugget("getFilteredTable(" + partitionFilters + ")",
() -> getFilteredTable(partitionFilters));
// If we have no partition filters, we defer all filters.
if (partitionFilters.isEmpty()) {
return new DeferredViewTable(definition, getDescription() + "-withDeferredFilters",
new PartitionAwareTableReference(this), null, null,
otherFilters.toArray(WhereFilter.ZERO_LENGTH_WHERE_FILTER_ARRAY));
}

return deferredFilters.isEmpty()
? withPartitionsFiltered
: new DeferredViewTable(definition, withPartitionsFiltered.getDescription() + "-withDeferredFilters",
new PartitionAwareQueryTableReference(withPartitionsFiltered), null, null,
deferredFilters.toArray(WhereFilter.ZERO_LENGTH_SELECT_FILTER_ARRAY));
// If we have any partition filters, we first create a new instance that filters the location keys accordingly,
// then coalesce, and then apply the remaining filters to the coalesced result.
final Table withPartitionsFiltered = QueryPerformanceRecorder.withNugget(
"getFilteredTable(" + partitionFilters + ")", () -> getFilteredTable(partitionFilters));
final Table coalesced = withPartitionsFiltered.coalesce();
return otherFilters.isEmpty()
? coalesced
: coalesced.where(Filter.and(otherFilters));
}

@Override
Expand Down
Loading
Loading