Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prune Nested Fields for Parquet Columns #5547

Closed
wants to merge 1 commit into from

Conversation

zhenxiao
Copy link
Collaborator

Read necessary fields only for Parquet nested columns
Currently, Presto will read all the fields in a struct for Parquet columns.
e.g.

select s.a, s.b
from t

if it is a parquet file, with struct column s: {a int, b double, c long, d float}
current Presto will read a, b, c, d from s, and output just a and b

For columnar storage as Parquet or ORC, we could do better, by just reading the necessary fields. In the previous example, just read {a int, b double} from s. Not reading other fields to save IO.

This patch introduces an optional NestedFields in ColumnHandle. When optimizing the plan, PruneNestedColumns optimizer will visit expressions, and put candidate nested fields into ColumnHandle. When scanning parquet files, the record reader could use NestedFields to specify necessary fields only for parquet files.

This has an dependency on @jxiang 's #4714, which gives us the flexibility to specify metastore schemas differently from parquet file schemas.

@dain @martint @electrum @cberner @erichwang any comments are appreciated

@ghost ghost added the CLA Signed label Jun 30, 2016
@@ -204,7 +204,7 @@
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main</artifactId>
<scope>test</scope>
<scope>provided</scope>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hive connector shouldn't have a dependency on presto-main. In fact, the classes in presto-main are not guaranteed to be available to plugins (due to classloader isolation)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it. Really need RowType and RowField, which are in presto-main, not in spi. Is there any special reason RowType (MapType, etc.) is not in spi? Could we move RowType in spi? Or, maybe duplicate the code in presto-hive? @martint @cberner @dain

@zhenxiao zhenxiao force-pushed the nested-pruning branch 2 times, most recently from a3a8c62 to da929c1 Compare July 4, 2016 02:13
List<parquet.schema.Type> fieldTypes = entryType.getFields();

if (useNames) {
this.converters = createConvertersByName(columnName, ((RowType) prestoType).getFields(), fieldTypes);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cberner we need to get RowType's Fields, and use the RowField name to look up the corresponding parquet schema in parquet files

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, you should be able to just use the TypeSignature all the names will be in there

@zhenxiao
Copy link
Collaborator Author

zhenxiao commented Jul 8, 2016

with @cberner help, get comments addressed, no longer need presto-main dependency any more
@cberner @dain @martint @electrum comments or suggestions are appreciated

@ghost ghost added the CLA Signed label Jul 12, 2016
@cberner
Copy link
Contributor

cberner commented Jul 12, 2016

@nezihyigitbasi do you have time to take a first look at this?

@nezihyigitbasi
Copy link
Contributor

@cberner sure, will do.

@nezihyigitbasi
Copy link
Contributor

@zhenxiao I gave this patch a try and I guess hit a bug. Once you fix the issue I will continue reviewing.

I first enabled the new optimizer in the config file and then created a test table

presto:nyigitbasi> create table nested_field_test as select CAST(row(1, 'a', 'b', 'c') AS ROW(f1 integer, f2 varchar, f3 varchar, f4 varchar)) as row_field;
CREATE TABLE: 1 row

presto:nyigitbasi> desc nested_field_test;
  Column   |                        Type                         | Comment
-----------+-----------------------------------------------------+---------
 row_field | row(F1 integer, F2 varchar, F3 varchar, F4 varchar) |
(1 row)

then when I query the table it fails:

presto:nyigitbasi> select row_field.F1 from nested_field_test;

Query 20160713_002306_00019_he7h3, FAILED, 1 node
http://localhost:8080/query.html?20160713_002306_00019_he7h3
Splits: 2 total, 0 done (0.00%)
CPU Time: 0.5s total,     0 rows/s,     0B/s, 37% active
Per Node: 0.0 parallelism,     0 rows/s,     0B/s
Parallelism: 0.0
1:13 [0 rows, 0B] [0 rows/s, 0B/s]

Query 20160713_002306_00019_he7h3 failed: Error opening Hive split s3n://netflix-dataoven-prod-users/hive/warehouse/nyigitbasi.db/nested_field_test/20160713_002223_00017_he7h3_a34c4336-28ce-4a72-9253-3529044d3710 (offset=0, length=548): Schema mismatch, metastore schema for row column row_field has 4 fields but parquet schema has 1 fields
com.facebook.presto.spi.PrestoException: Error opening Hive split s3n://netflix-dataoven-prod-users/hive/warehouse/nyigitbasi.db/nested_field_test/20160713_002223_00017_he7h3_a34c4336-28ce-4a72-9253-3529044d3710 (offset=0, length=548): Schema mismatch, metastore schema for row column row_field has 4 fields but parquet schema has 1 fields
    at com.facebook.presto.hive.parquet.ParquetHiveRecordCursor.createParquetRecordReader(ParquetHiveRecordCursor.java:489)
    at com.facebook.presto.hive.parquet.ParquetHiveRecordCursor.<init>(ParquetHiveRecordCursor.java:254)
    at com.facebook.presto.hive.parquet.ParquetRecordCursorProvider.createHiveRecordCursor(ParquetRecordCursorProvider.java:96)
    at com.facebook.presto.hive.HivePageSourceProvider.getHiveRecordCursor(HivePageSourceProvider.java:129)
    at com.facebook.presto.hive.HivePageSourceProvider.createPageSource(HivePageSourceProvider.java:107)
    at com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider.createPageSource(ClassLoaderSafeConnectorPageSourceProvider.java:44)
    at com.facebook.presto.split.PageSourceManager.createPageSource(PageSourceManager.java:48)
    at com.facebook.presto.operator.ScanFilterAndProjectOperator.createSourceIfNecessary(ScanFilterAndProjectOperator.java:292)
    at com.facebook.presto.operator.ScanFilterAndProjectOperator.isFinished(ScanFilterAndProjectOperator.java:180)
    at com.facebook.presto.operator.Driver.processInternal(Driver.java:375)
    at com.facebook.presto.operator.Driver.processFor(Driver.java:301)
    at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:618)
    at com.facebook.presto.execution.TaskExecutor$PrioritizedSplitRunner.process(TaskExecutor.java:529)
    at com.facebook.presto.execution.TaskExecutor$Runner.run(TaskExecutor.java:665)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Schema mismatch, metastore schema for row column row_field has 4 fields but parquet schema has 1 fields
    at com.google.common.base.Preconditions.checkArgument(Preconditions.java:145)
    at com.facebook.presto.hive.parquet.ParquetHiveRecordCursor$ParquetStructConverter.createConverters(ParquetHiveRecordCursor.java:864)
    at com.facebook.presto.hive.parquet.ParquetHiveRecordCursor$ParquetStructConverter.<init>(ParquetHiveRecordCursor.java:946)
    at com.facebook.presto.hive.parquet.ParquetHiveRecordCursor.createGroupConverter(ParquetHiveRecordCursor.java:821)
    at com.facebook.presto.hive.parquet.ParquetHiveRecordCursor.access$300(ParquetHiveRecordCursor.java:125)
    at com.facebook.presto.hive.parquet.ParquetHiveRecordCursor$PrestoReadSupport.<init>(ParquetHiveRecordCursor.java:534)
    at com.facebook.presto.hive.parquet.ParquetHiveRecordCursor.createParquetRecordReader(ParquetHiveRecordCursor.java:428)
    ... 16 more

But if I disable the new optimizer I can query the table fine.

presto:nyigitbasi> set session optimize_nested_columns=false;
SET SESSION
presto:nyigitbasi> select row_field.F1 from nested_field_test;
 f1
----
  1
(1 row)

@zhenxiao
Copy link
Collaborator Author

@nezihyigitbasi Thank you, get it fixed. Your comments are appreciated.

@ghost ghost added the CLA Signed label Jul 13, 2016
@@ -497,17 +505,19 @@ public PrestoParquetRecordReader(PrestoReadSupport readSupport)
private final boolean useParquetColumnNames;
private final List<HiveColumnHandle> columns;
private final List<Converter> converters;
private final TypeManager typeManager;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need this as a field? This is not used anywhere, the constructor arg typeManager is passed to the createGroupConverter() call.

@zhenxiao
Copy link
Collaborator Author

thank you, @mbasmanova
I get comments addressed.

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhenxiao Some more comments.

return ExpressionTreeRewriter.rewriteWith(new DereferenceReplacer(replacements), expression);
}

protected static class DereferenceReplacer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

return false;
}

protected static List<DereferenceExpression> extractDereference(Expression expression)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this method returns a list, extractDereferenceExpressions might be a better name

return filterNode;
}

List<DereferenceExpression> predicates = extractDereference(filterNode.getPredicate());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this variable contains a list of dereference expressions used in the predicate, but the expressions themselves are not predicates, e.g. a.b < 10 is a predicate, but a.b is not. Since this variable is used only once, it can be inlined which removes the need to come up with a better name.

return Result.ofPlanNode(target);
}

protected abstract PlanNode dereferencePushDown(Context context, N targetNode, Map<DereferenceExpression, Symbol> expressions, Assignments assignments);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a general rule, method names start with a verb, e.g. pushDownDereferences

return projectNode;
}

Map<Symbol, Expression> pushdownDereferences = pushdownExpressions.entrySet().stream().collect(toImmutableMap(Map.Entry::getValue, Map.Entry::getKey));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is not needed. You can swap key and value in pushdownExpressions instead.

        Map<Symbol, DereferenceExpression> pushdownExpressions = expressions.entrySet().stream()
                .filter(entry -> !isCastRowType(entry.getKey()))
                .filter(entry -> outputSymbols.contains(getOnlyElement(extractAll(entry.getKey()))))
                .collect(toImmutableMap(Map.Entry::getValue, Map.Entry::getKey));
...
        ImmutableMap.Builder<Symbol, Symbol> symbolsBuilder = ImmutableMap.builder();
        pushdownExpressions.entrySet().stream()
                .forEach(entry -> symbolsBuilder.put(getOnlyElement(extractAll(entry.getValue())), entry.getKey()));
        Map<Symbol, Symbol> symbolsMap = symbolsBuilder.build();
...
        DereferenceExpression targetDereference = pushdownExpressions.get(targetSymbol);

return joinNode;
}

Map<Symbol, Expression> pushdownDereferences = pushdownExpressions.entrySet().stream().collect(toImmutableMap(Map.Entry::getValue, Map.Entry::getKey));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to other rules, this variable is not needed


Map<Symbol, Expression> pushdownDereferences = pushdownExpressions.entrySet().stream().collect(toImmutableMap(Map.Entry::getValue, Map.Entry::getKey));
ImmutableMap.Builder<Symbol, Symbol> symbolsBuilder = ImmutableMap.builder();
for (Map.Entry<Expression, Symbol> entry : pushdownExpressions.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider

        pushdownExpressions.entrySet().stream()
                .forEach(entry -> symbolsBuilder.put(getOnlyElement(extractAll(entry.getValue())), entry.getKey()));

.map(EquiJoinClause::toExpression)
.map(expression -> replaceDereferences(expression, expressions))
.map(this::getEquiJoinClause)
.collect(toImmutableList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EqualJoinClause can only reference symbols; it cannot contain dereference expressions, can it?

Assignments assignments = assignmentsBuilder.build();

PlanNode result = dereferencePushDown(context, child, expressions, assignments);
if (result.getId().equals(child.getId())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be clearer if dereferencePushDown returned a Result. Then this check would be simplified to if (result.isEmpty())

@mbasmanova mbasmanova self-assigned this Mar 19, 2019
Copy link
Collaborator Author

@zhenxiao zhenxiao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you, @mbasmanova
I get comments addressed

Symbol targetSymbol = symbolsMap.get(entry.getKey());
DereferenceExpression targetDereference = (DereferenceExpression) pushdownDereferences.get(targetSymbol);
if (entry.getValue() instanceof DereferenceExpression) {
sourceBuilder.put(targetSymbol, targetDereference);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are correct. this never happens. It is dead code. I will fix

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhenxiao A few questions.

return context.getSymbolAllocator().newSymbol(expression, type);
}

private static boolean prefixExist(DereferenceExpression expression, final Set<DereferenceExpression> dereferences)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop final

Expression base = expression.getBase();
while (base != null) {
if (base instanceof SymbolReference) {
return dereferences.contains(base);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dereferences is a set of DereferenceExpression and base is an instance of SymbolReference; will this ever return true? I don't think so. Looks like this whole if statement can be deleted and the outer loop simplified to

        while (base instanceof DereferenceExpression) {
            if (dereferences.contains(base)) {
                return true;
            }
            base = ((DereferenceExpression) base).getBase();
        }

return context.getSymbolAllocator().newSymbol(expression, type);
}

private static boolean prefixExist(DereferenceExpression expression, final Set<DereferenceExpression> dereferences)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

baseExists might be a better name


private static Map<DereferenceExpression, Symbol> getDereferenceSymbolMap(ProjectNode node, Context context, Metadata metadata, SqlParser sqlParser)
{
Set<DereferenceExpression> expressions = extractExpressionsNonRecursive(node).stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node.getAssignments().getExpressions() is equivalent but easier to understand

for (Map.Entry<Symbol, Expression> entry : node.getAssignments().entrySet()) {
assignmentsBuilder.put(entry.getKey(), ExpressionTreeRewriter.rewriteWith(new DereferenceReplacer(expressions), entry.getValue()));
}
Assignments assignments = assignmentsBuilder.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplify using Assignments::rewrite:

Assignments assignments = node.getAssignments().rewrite(new DereferenceReplacer(expressions));

@Test
public void testDereferencePushdownProject()
{
assertPlan("WITH t1 as ( SELECT * FROM (values ROW(CAST(ROW(1, 2.0) AS ROW(x BIGINT, y DOUBLE)))) as t (msg) ) SELECT msg.x FROM t1 WHERE msg.x > 10",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reformat for readability (capitalize SQL keywords, split into multiple lines, replace t1 with t(msg)):

        assertPlan("WITH t(msg) AS (SELECT * FROM (VALUES ROW(CAST(ROW(1, 2.0) AS ROW(x BIGINT, y DOUBLE))))) " +
                        "SELECT msg.x FROM t WHERE msg.x > 10",
                output(ImmutableList.of("x"),
                    project(ImmutableMap.of("x", expression("msg.x")),
                            filter("msg.x > BIGINT '10'",
                                    values("msg")))));

However, I'd expect msg.x projection to be pushed down, e.g. I'd think the plan would look like

Output(x)
   Filter(x > 10)
      Project(x: msg.x)
         Values

Also, I removed the new rules from the optimizer and re-ran the tests. All tests but testDereferencePushdownJoin succeeded. Could you modify the tests to use queries that are affected by the new rules?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me rework the testcases
predicatePushDown will push down filter just on top of tablescan or values. It is OK we have dereference just on top of filter, as long as ScanFilterAndProject has dereferences, so that virtual nested columns could be generated by leveraging dereferences in connector. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhenxiao That's right. This is a subtle point that's not easy to get from reading the rules code. Perhaps, modify the rules to not do anything for project(filter(tablescan)) and project(filter(values)). It would also help to update the PR description to give examples of queries where dereference pushdown will be successful and show queries where new rules don't make a difference. For the individual rules, consider adding documentation. See com.facebook.presto.sql.planner.iterative.rule.PushProjectionThroughExchange for an example.

Copy link
Collaborator Author

@zhenxiao zhenxiao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you, @mbasmanova
Let me rework on the testcases. other comments are addressed.

@Test
public void testDereferencePushdownProject()
{
assertPlan("WITH t1 as ( SELECT * FROM (values ROW(CAST(ROW(1, 2.0) AS ROW(x BIGINT, y DOUBLE)))) as t (msg) ) SELECT msg.x FROM t1 WHERE msg.x > 10",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me rework the testcases
predicatePushDown will push down filter just on top of tablescan or values. It is OK we have dereference just on top of filter, as long as ScanFilterAndProject has dereferences, so that virtual nested columns could be generated by leveraging dereferences in connector. What do you think?

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhenxiao Some further questions and comments.

if (result.isEmpty()) {
return Result.empty();
}
ProjectNode target = new ProjectNode(context.getIdAllocator().getNextId(), result.getTransformedPlan().get(), assignments);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: inline this variable


DereferenceReplacer(Map<DereferenceExpression, Symbol> expressions)
{
this.expressions = expressions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requireNonNull: this.expressions = requireNonNull(expressions, "expressions is null");

new DereferencePushDownFilter(metadata, sqlParser),
new DereferencePushDownJoin(metadata, sqlParser),
new DereferencePushDownProject(metadata, sqlParser),
new DereferencePushDownSort(metadata, sqlParser));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit verbose. How do you feel about creating a top-level PushDownDereferences class and covert all these rules into inner classes?

dereferencePushDownRules = new PushDownDereferences(metadata, sqlParser).rules();

dereferencePushDownRules variable can then be inlined

List<Symbol> outputSymbols = filterNode.getOutputSymbols();
Map<Symbol, Expression> pushdownExpressions = expressions.entrySet().stream()
.filter(entry -> !isCastRowType(entry.getKey()))
.filter(entry -> outputSymbols.contains(getOnlyElement(extractAll(entry.getKey()))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it guaranteed that extractAll(entry.getKey()) returns just one element? I'm thinking of a dereference expression f(a, b).c which contains two symbols: a and b.

{
List<Symbol> outputSymbols = filterNode.getOutputSymbols();
Map<Symbol, Expression> pushdownExpressions = expressions.entrySet().stream()
.filter(entry -> !isCastRowType(entry.getKey()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the logic behind checking for cast to row?

.collect(toImmutableMap(Map.Entry::getValue, Map.Entry::getKey));

if (pushdownExpressions.isEmpty()) {
return Result.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why exit here? What if the predicate has dereferences that can be pushed down?


ImmutableList.Builder<Symbol> outputBuilder = ImmutableList.builder();
outputBuilder.addAll(leftChild.getOutputSymbols()).addAll(rightChild.getOutputSymbols());
JoinNode result = new JoinNode(context.getIdAllocator().getNextId(), joinNode.getType(), leftChild, rightChild, joinNode.getCriteria(), outputBuilder.build(), joinFilter, joinNode.getLeftHashSymbol(), joinNode.getRightHashSymbol(), joinNode.getDistributionType());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is too long. It might be easier to read if rewritten to

        return Result.ofPlanNode(
                new JoinNode(
                    context.getIdAllocator().getNextId(),
                    joinNode.getType(),
                    leftChild,
                    rightChild,
                    joinNode.getCriteria(),
                    ImmutableList.<Symbol>builder()
                            .addAll(leftChild.getOutputSymbols())
                            .addAll(rightChild.getOutputSymbols())
                            .build(),
                    joinNode.getFilter().map(expression -> replaceDereferences(expression, expressions)),
                    joinNode.getLeftHashSymbol(),
                    joinNode.getRightHashSymbol(),
                    joinNode.getDistributionType()));

return Result.ofPlanNode(result);
}

private EquiJoinClause getEquiJoinClause(Expression expression)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is not used

PlanNode right = joinNode.getRight();

Assignments.Builder leftBuilder = Assignments.builder();
List<Symbol> leftOutputs = left.getOutputSymbols().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline this variable and rightOutputs

        Assignments.Builder leftBuilder = Assignments.builder();
        leftBuilder.putIdentities(left.getOutputSymbols().stream()
                .filter(symbol -> !symbolsMap.containsKey(symbol))
                .collect(toImmutableList()));

        Assignments.Builder rightBuilder = Assignments.builder();
        rightBuilder.putIdentities(right.getOutputSymbols().stream()
                .filter(symbol -> !symbolsMap.containsKey(symbol))
                .collect(toImmutableList()));

return Result.empty();
}

ImmutableMap.Builder<Symbol, Symbol> symbolsBuilder = ImmutableMap.builder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified as

        Map<Symbol, Symbol> symbolsMap = pushdownExpressions.entrySet().stream()
                .collect(toImmutableMap(entry -> getOnlyElement(extractAll(entry.getValue())), Map.Entry::getKey));

@zhenxiao
Copy link
Collaborator Author

thank you, @mbasmanova
I get comments addressed

A few updates:

  1. we need dereference pushdown for Project(Join). Or joins would output whole struct.
  2. we need dereference pushdown for Project(Project), Project(Sort), so that dereferences could pass through Project and Sort, and down to Joins, if there is any.
  3. ScanFilterAndProject will not change, as projections belongs to the same table are merged into ScanFilterAndProject
  4. do not need Project(Filter), Project(Aggregation). Project either already happens at lower level, or predicate pushdown will guarantee Project(Filter(TableScan))), which is OK, as ScanFilterAndProject has all the projections, we could leverage it for following steps, e.g. getNestedColumnHandles.

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhenxiao Some further questions and comments.

@@ -200,6 +201,7 @@ public PlanOptimizers(
Set<Rule<?>> predicatePushDownRules = ImmutableSet.of(
new MergeFilters());

Set<Rule<?>> dereferencePushDownRules = new PushDownDereferences(metadata, sqlParser).rules();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline this variable

}
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong indentation

new DereferencePushDownProject(metadata, sqlParser));
}

public abstract class DereferencePushDownRule<N extends PlanNode>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

* Transforms:
* <pre>
* Project(a.msg.x)
* Join(a.msg.y = b.msg.y)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing because join's equi-clause can't contain expressions. The following might be clearer:

     * Transforms:
     * <pre>
     *  Project(a_x := a.msg.x)
     *    Join(a_y = b_y) => [a]
     *      Project(a_y := a.msg.y)
     *          Source(a)
     *      Project(b_y := b.msg.y)
     *          Source(b)
     *  </pre>
     * to:
     * <pre>
     *  Join(a_y = b_y) => [a_x]
     *    Project(a_x := a.msg.x, a_y := a.msg.y)
     *      Source(a)
     *    Project(b_y := b.msg.y)
     *      Source(b)
     * </pre>

* Source(b)
* </pre>
*/
public class DereferencePushDownJoin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PushDownDereferenceThroughJoin might be a better name

}
}

protected static Expression replaceDereferences(Expression expression, Map<DereferenceExpression, Symbol> replacements)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

}
}

protected static List<DereferenceExpression> extractDereferenceExpressions(Expression expression)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

.collect(toImmutableMap(Function.identity(), expression -> newSymbol(expression, context, metadata, sqlParser)));
}

protected static Symbol newSymbol(Expression expression, Context context, Metadata metadata, SqlParser sqlParser)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

@Test
public void testDereferencePushdownJoin()
{
assertPlan("WITH t(msg) AS ( SELECT * FROM (VALUES ROW(CAST(ROW(1, 2.0) AS ROW(x BIGINT, y DOUBLE))))) " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

field, field1, left and right names and not easy to follow. How about msg, a_y, b_y, etc.?

        assertPlan("WITH t(msg) AS (SELECT * FROM (VALUES ROW(CAST(ROW(1, 2.0) AS ROW(x BIGINT, y DOUBLE))))) " +
                        "SELECT b.msg.x FROM t a, t b WHERE a.msg.y = b.msg.y",
                output(ImmutableList.of("b_x"),
                        join(INNER, ImmutableList.of(equiJoinClause("a_y", "b_y")),
                                anyTree(
                                        project(ImmutableMap.of("a_y", expression("msg.y")),
                                                values("msg"))
                                ), anyTree(
                                        project(ImmutableMap.of("b_y", expression("msg.y"), "b_x", expression("msg.x")),
                                                values("msg"))))));

Copy link
Collaborator Author

@zhenxiao zhenxiao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you, @mbasmanova
I get comments addressed
will add more testcase coverage. could you please review when u are free?

{
List<Symbol> outputSymbols = joinNode.getOutputSymbols();
Map<Symbol, Expression> pushdownExpressions = expressions.entrySet().stream()
.filter(entry -> !isCastRowType(entry.getKey()))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to pushdown (Cast As Row).field dereference, since lower level does not have this as Row.
Fix me if my understanding is wrong

* Project(a.key)
* Project(a.msg.x)
* Source(a)
* </pre>
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad. trying to mean key is a primitive type, but msg is a struct type, only pushdown dereferences. Will fix it

* Sort
* Project(a.msg.x)
* Source(a)
* </pre>
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad, it is push dereference through sort. Will fix and add unnest

@zhenxiao zhenxiao force-pushed the nested-pruning branch 2 times, most recently from 099c844 to 4ab0fa9 Compare March 26, 2019 06:53
Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhenxiao Some further questions.

{
List<Symbol> outputSymbols = joinNode.getOutputSymbols();
Map<Symbol, Expression> pushdownExpressions = expressions.entrySet().stream()
.filter(entry -> !isCastRowType(entry.getKey()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, but the same can be said about any dereference where base is not a symbol, e.g. f(a).x. Why is it important to filter out cast, but not other functions? Could you add a test that covers these cases?

List<Symbol> outputSymbols = joinNode.getOutputSymbols();
Map<Symbol, Expression> projectExpressions = expressions.entrySet().stream()
.filter(entry -> !isCastRowType(entry.getKey()))
.filter(entry -> outputSymbols.containsAll(extractAll(entry.getKey())))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand this filter. entry.getKey() is a dereference expression coming from a projection above this join. extractAll(entry.getKey()) returns all symbols that are used in that dereference expression. outputSymbols are symbols produced by the join; these are inputs to the project node above. With project over join, the project can only use output symbols of the join. Hence, this check must always be true. Hence, it is not needed. Am I missing something? I commented out this check and the above check for cast and ran the tests. All passed.

public Result apply(ProjectNode node, Captures captures, Context context)
{
N child = captures.get(targetCapture);
Map<DereferenceExpression, Symbol> expressions = getDereferenceSymbolMap(node.getAssignments().getExpressions().stream().collect(toImmutableList()), context, metadata, sqlParser);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.stream().collect(toImmutableList()) is unnecessary; just change the type of the first argument of getDereferenceSymbolMap to Collection<Expression>.

dereferenceSymbolsBuilder.putAll(expressions);
if (joinNode.getFilter().isPresent()) {
Map<DereferenceExpression, Symbol> predicateSymbols = getDereferenceSymbolMap(ImmutableList.of(joinNode.getFilter().get()), context, metadata, sqlParser).entrySet().stream()
.filter(entry -> !projectExpressions.values().contains(entry.getKey()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this filter use baseExists?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This is to filter out join filters dereferences that not covered in projectExpressions.
baseExists is to filter out dereferences already exists in other dereferences base

{
return ImmutableSet.of(
new PushDownDereferenceThroughJoin(metadata, sqlParser),
new PushDownDereferenceThroughSort(metadata, sqlParser),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you scan through all the node types and see if there are more nodes to add here? I'm thinking about AssignUniqueId, MarkDistinct, Limit and SemiJoin.

* </pre>
* to:
* <pre>
* Project(a_x := a_z)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This project node is redundant. Why not remove it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is RemoveRedundantIdentityProjections following PushDownDereferences, so that redundant project node is removed. I will remove it from comments. Or, do you think we should add RemoveRedundantIdentityProjections to PushDownDereferences?

* </pre>
* to:
* <pre>
* Project(a_x := a_y)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This project node is redundant.

* </pre>
* to:
* <pre>
* Project(a_x := a_y)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This project node is redundant.

* </pre>
* to:
* <pre>
* Join(a_y = b_y) => [a_x]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other examples show an identity projection in the result, but it is missing here. At the same time the code seems to be actually generating an identity projection and there is no logic to remove it. Let's add it here or add logic to remove it to the rule.

.collect(toImmutableSet());

return dereferences.stream()
.filter(expression -> !baseExists(expression, dereferences))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests only cover one-level deep dereferences, e.g. msg.x and msg.y. Would you add tests with more levels to test the logic of consolidating a.b.c and a.b into a.b?

Copy link
Collaborator Author

@zhenxiao zhenxiao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you, @mbasmanova
I get comments addressed
will add support for AssignUniqueId, MarkDistinct, Limit, Aggregation and SemiJoin

dereferenceSymbolsBuilder.putAll(expressions);
if (joinNode.getFilter().isPresent()) {
Map<DereferenceExpression, Symbol> predicateSymbols = getDereferenceSymbolMap(ImmutableList.of(joinNode.getFilter().get()), context, metadata, sqlParser).entrySet().stream()
.filter(entry -> !projectExpressions.values().contains(entry.getKey()))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This is to filter out join filters dereferences that not covered in projectExpressions.
baseExists is to filter out dereferences already exists in other dereferences base

{
List<Symbol> outputSymbols = joinNode.getOutputSymbols();
Map<Symbol, Expression> pushdownExpressions = expressions.entrySet().stream()
.filter(entry -> !isCastRowType(entry.getKey()))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, should only pushdown dereferences with base as DeferenceExpression or SymbolReference. Will fix it

* </pre>
* to:
* <pre>
* Project(a_x := a_z)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is RemoveRedundantIdentityProjections following PushDownDereferences, so that redundant project node is removed. I will remove it from comments. Or, do you think we should add RemoveRedundantIdentityProjections to PushDownDereferences?

@mbasmanova
Copy link
Contributor

@zhenxiao

I get comments addressed
will add support for AssignUniqueId, MarkDistinct, Limit, Aggregation and SemiJoin

Sounds good. Ping me when the changes are ready for review.

@phd3
Copy link
Contributor

phd3 commented Apr 15, 2019

@zhenxiao I was trying your patch on some examples and the following query fails with the PushDownDereferences optimizer.

with t(myint, arr) as
          (SELECT * FROM (VALUES (5, CAST(ARRAY[ROW(1,ROW(2,5)),ROW(2,ROW(3,9))] AS ARRAY(ROW(BIGINT, ROW(a BIGINT, b BIGINT)))))))
          select w.colb.a from t cross join unnest(arr) as w(cola, colb)

Plan for this query without the optimizer:

- Output[a] => [expr_16:bigint]
        a := expr_16
    - Project[] => [expr_16:bigint]
            expr_16 := "field_15".a
        - Unnest[replicate=, unnest=field_0:array(row(bigint, row(a bigint, b bigint)))] => [field_14:bigint, field_15:row(a bigint, b bigint)]
            - Values => [field_0:array(row(bigint, row(a bigint, b bigint)))]
                    (CAST($literal$array(row(integer,row(integer,integer)))(from_base64(VARCHAR(184) AwAAAFJPVw...))))

Do you know why that might be the case?

@phd3
Copy link
Contributor

phd3 commented Apr 15, 2019

@zhenxiao Can you please add some tests for the unnest queries on multi-level complex data types in arrays ? for example, pushdown in cases where only field y.w.u is accessed after unnesting array(row(x BIGINT, y row(z BIGINT, w row(u BIGINT, v DOUBLE))))) ?

The testcases will help us have a better documentation of what this feature does and does not support.

@mbasmanova
Copy link
Contributor

I assume #13271 superceds this one, hence, closing.

@mbasmanova mbasmanova closed this Aug 27, 2019
@zhenxiao zhenxiao deleted the nested-pruning branch January 22, 2022 15:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.