Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support aggregate functions in row pattern recognition #8738

Merged
merged 6 commits into from
Dec 3, 2021

Conversation

kasiafi
Copy link
Member

@kasiafi kasiafi commented Jul 31, 2021

No description provided.

@cla-bot cla-bot bot added the cla-signed label Jul 31, 2021
@kasiafi kasiafi added the WIP label Jul 31, 2021
@kasiafi
Copy link
Member Author

kasiafi commented Jul 31, 2021

Example output from PlanPrinter with aggregations:

trino:tiny> EXPLAIN SELECT running_labels, max_id
         ->         FROM (VALUES 1) t(id)
         ->         MATCH_RECOGNIZE (
         ->             MEASURES
         ->                      RUNNING array_agg(CLASSIFIER(A)) AS running_labels,
         ->                      FIRST(A.id) + FINAL max_by(id, MATCH_NUMBER()) AS max_id
         ->             PATTERN (A*)
         ->             DEFINE A AS avg(A.id) > 0.5
         ->         );
                                    Query Plan
-----------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [running_labels, max_id]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     Output[running_labels, max_id]
     │   Layout: [running_labels:array(varchar), max_id:integer]
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
     └─ PatterRecognition[]
        │   Layout: [running_labels:array(varchar), max_id:integer]
        │   running_labels := "array_agg"
        │       array_agg := RUNNING array_agg("classifier"){A}
        │   max_id := ("field_0" + "max_by")
        │       field_0 := field[RUNNING FIRST({A})]
        │       max_by := FINAL max_by("field", "match_number"){}
        │   ONE ROW PER MATCH
        │   AFTER MATCH SKIP PAST LAST ROW
        │   pattern[A{0, ∞}] (INITIAL)
        │   subsets[]
        │   A := ("avg" > CAST(DECIMAL '0.5' AS double))
        │       avg := RUNNING avg("expr"){A}
        └─ LocalExchange[SINGLE] ()
           │   Layout: [expr:bigint, field:integer]
           │   Estimates: {rows: 1 (14B), cpu: 19, memory: 0B, network: 0B}
           └─ Project[]
              │   Layout: [expr:bigint, field:integer]
              │   Estimates: {rows: 1 (14B), cpu: 19, memory: 0B, network: 0B}
              │   expr := CAST("field" AS bigint)
              └─ LocalExchange[ROUND_ROBIN] ()
                 │   Layout: [field:integer]
                 │   Estimates: {rows: 1 (5B), cpu: 5, memory: 0B, network: 0B}
                 └─ Values
                        Layout: [field:integer]
                        Estimates: {rows: 1 (5B), cpu: 0, memory: 0B, network: 0B}
                        (1)


(1 row)

@kasiafi kasiafi force-pushed the 263PatternAggregations branch 6 times, most recently from 51a7fb5 to 1302493 Compare August 7, 2021 12:06
@kasiafi kasiafi force-pushed the 263PatternAggregations branch 4 times, most recently from a60097c to e5ad1db Compare August 27, 2021 13:05
@kasiafi kasiafi removed the WIP label Aug 27, 2021
@kasiafi kasiafi force-pushed the 263PatternAggregations branch from e5ad1db to c11e350 Compare August 27, 2021 13:47
@kasiafi kasiafi requested a review from martint August 27, 2021 16:03
@kasiafi kasiafi force-pushed the 263PatternAggregations branch from 01d5018 to 5ecdc01 Compare August 30, 2021 15:51
@kasiafi
Copy link
Member Author

kasiafi commented Aug 30, 2021

@mosabua could you please review the docs part?

Copy link
Member

@mosabua mosabua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just had a look at the docs and suggested minor changes. Great job. .. and sorry for being a bit late.

docs/src/main/sphinx/sql/match-recognize.rst Outdated Show resolved Hide resolved
docs/src/main/sphinx/sql/match-recognize.rst Show resolved Hide resolved
docs/src/main/sphinx/sql/match-recognize.rst Outdated Show resolved Hide resolved
docs/src/main/sphinx/sql/match-recognize.rst Outdated Show resolved Hide resolved
docs/src/main/sphinx/sql/match-recognize.rst Outdated Show resolved Hide resolved
docs/src/main/sphinx/sql/match-recognize.rst Outdated Show resolved Hide resolved
docs/src/main/sphinx/sql/match-recognize.rst Outdated Show resolved Hide resolved
docs/src/main/sphinx/sql/match-recognize.rst Outdated Show resolved Hide resolved
docs/src/main/sphinx/sql/match-recognize.rst Outdated Show resolved Hide resolved
docs/src/main/sphinx/sql/match-recognize.rst Outdated Show resolved Hide resolved
@kasiafi kasiafi force-pushed the 263PatternAggregations branch 4 times, most recently from b7d0fd6 to eedc5fc Compare October 5, 2021 11:33
@kasiafi
Copy link
Member Author

kasiafi commented Oct 5, 2021

@mosabua I addressed your comments.

@kasiafi kasiafi force-pushed the 263PatternAggregations branch 2 times, most recently from 5b48467 to ea1611f Compare October 7, 2021 08:45
@kasiafi kasiafi force-pushed the 263PatternAggregations branch 2 times, most recently from ae16032 to f6e5e94 Compare October 8, 2021 12:22
@kasiafi kasiafi force-pushed the 263PatternAggregations branch 2 times, most recently from 2ffbc87 to e4338f8 Compare October 11, 2021 11:11
@kasiafi kasiafi requested a review from martint October 11, 2021 13:40
@JsonSubTypes.Type(value = ScalarValuePointer.class, name = "scalar"),
@JsonSubTypes.Type(value = AggregationValuePointer.class, name = "aggregation"),
})
public abstract class ValuePointer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be an interface.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that, but this way the ValuePointers didn't serialize.

@kasiafi kasiafi force-pushed the 263PatternAggregations branch 2 times, most recently from 367f91f to 2cf1fcf Compare November 9, 2021 14:11
docs/src/main/sphinx/sql/match-recognize.rst Outdated Show resolved Hide resolved
Comment on lines +52 to +63
private int aggregated;

// length of the prefix of the match where all applicable positions were identified,
// and stored in `allPositions`. During the pattern matching phase it might exceed
// the `aggregated` prefix due to `getAllPositions()` calls.
private int evaluated;

// all identified applicable positions for the current match until the `evaluated` position, starting from 0
// this list is updated:
// - by `resolveNewPositions()`
// - by `getAllPositions()` (only during the pattern matching phase)
private final IntList allPositions;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear what aggregated vs evaluated vs allPositions represent. I guess part of my confusion is due to not understanding how this class fits in the overall picture. Can you elaborate a bit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SetEvaluator is part of the state of MatchAggregation.
Its main purpose is to provide new applicable positions(*) to be aggregated by the MatchAggregation, since the previous call.
The additional purpose is to keep a collection of all applicable positions to be used by ThreadEquivalence.

Accordingly to the two purposes, two pointers are used to mark how much input was processed:

  • aggregated indicates how much input was processed by the MatchAggregation,
  • evaluated indicates how much input was processed to find applicable positions.

evaluated is never behind aggregated, because if we run the aggregation on some portion of input, we must have resolved the applicable positions first.
evaluated might be ahead of aggregated, in the case when we resolved the positions, but didn't run the actual aggregation. That happens via the getAllPositions() call, when ThreadEquivalence asks for the set of positions and compares them, but doesn't run the actual aggregation.

  • allPositions are all the applicable positions up until the evaluated pointer

(*) applicable positions are the positions with certain labels, e.g. for avg(A.x) these will be the positions labeled with A in the current match, wrt the running or final semantics.

@@ -175,6 +184,18 @@ public static Block compute(
return work.getResult();
}

public static Block[] precomputeNulls(List<PhysicalValueAccessor> expectedLayout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method doesn't really need to know about PhysicalValueAccessors. It just cares about the types. Also, this doesn't seem like the right place to hold it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method doesn't really need to know about PhysicalValueAccessors. It just cares about the types

It cares about the types, but it also has to skip aggregations, which can be present in the list.
It all happens at construction time, and PhysicalValueAccessors are one of the constructor parameters, so I think we can pass just the accessors list instead of extracting the types in the constructor and passing them to the helper method.

this doesn't seem like the right place to hold it

Why? What is the right place?

@kasiafi kasiafi force-pushed the 263PatternAggregations branch from 12b7513 to 2ad80f3 Compare November 16, 2021 11:35
@kasiafi kasiafi force-pushed the 263PatternAggregations branch from 2ad80f3 to d70a37f Compare November 26, 2021 19:26
docs/src/main/sphinx/sql/match-recognize.rst Outdated Show resolved Hide resolved
FunctionCall classifier = classifierCall.get();
if (!classifier.getArguments().isEmpty()) {
IrLabel label = irLabel(((Identifier) getOnlyElement(classifier.getArguments())).getCanonicalValue());
Set<IrLabel> labels = subsets.get(label);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can a subset refer to other subsets? If so, does this guarantee that all the subset references are expanded all the way down to the leaves?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, according to the spec, a subset can only refer to primary row pattern variables.

private static class AggregationIndexes
{
final boolean foundNoClassifierAggregations;
final List<List<Integer>> noClassifierAggregations;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an explanation of what this list and each of the contained lists represents. The signature of the field doesn't make that clear enough.

Copy link
Member Author

@kasiafi kasiafi Dec 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is explained in a comment above the method classifyAggregations(), and also mentioned in the class javadoc, and commented in the method equivalent() where those structures are used.


private static class AggregationIndexes
{
final boolean foundNoClassifierAggregations;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unnecessary. Why not just rely on noClassifierAggregations.isEmpty() or something along those lines?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noClassifierAggregations as well as classifierAggregations are List<List<>>.
They have an entry for every label, and this entry is a list of relevant aggregations.

So, this is not that easy to determine if the whole structure is empty. If I didn't keep the boolean, I would have to iterate over the top-level list to find out.

I added the boolean in answer to your comment: #8738 (comment) It allows to skip part of the computation at runtime.

}
}
}
noClassifierAggregations.add(noClassifierAggregationIndexes.build());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean if noClassifierAggregationIndexes is empty? (i.e., none of the pointers hit the first branch). Same question for the other list below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means that for a given label, in the label's defining condition, there are no aggregations of this kind:

  • "no-classifier aggregations" is the kind of aggregations whose result does not depend on the actual labels, e.g. max(A.price)
  • "classifier aggregations" is the kind of aggregations whose result depends on the actual labels, e.g. array_agg(CLASSIFIER(U)), where U is a subset {A, B}

These two kinds of aggregations are explained in the comment above the method classifyAggregations.

This might look like arbitrary partitioning, but it is based on the observation that for some aggregations it is enough to compare the sets of positions, while other aggregations require also comparing the matched labels.

The overall purpose is to optimize ThreadEquivalence and make it more accurate by skipping unnecessary comparisons.

*/
package io.trino.operator.window.pattern;

public class MatchAggregationPointer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this represent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MatchAggregationPointer replaces AggregationValuePointer after the optimization phase is finished.
This is an accessor to aggregation at execution time. At this time, all MatchAggregations are gathered in an array, and MatchAggregationPointer contains an index to that array.

@kasiafi kasiafi force-pushed the 263PatternAggregations branch 2 times, most recently from 777d4df to 656a27f Compare December 2, 2021 14:05
checkArgument(currentRow >= patternStart && currentRow < patternStart + matchedLabels.length(), "current row is out of bounds of the match");
checkState(aggregated <= evaluated && evaluated <= matchedLabels.length(), "SetEvaluator in inconsistent state");

IntList positions = new IntList(DEFAULT_CAPACITY);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a subset of allPositions? Why not return an ArrayView over a portion of that, instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

positions are relative to partition start, and allPositions are relative to match start.

@kasiafi kasiafi force-pushed the 263PatternAggregations branch from 656a27f to 93f05e0 Compare December 3, 2021 10:40
This rule identifies and pre-projects all eligible arguments
of aggregations in pattern recognition expressions.
The purpose of it is to limit the number of runtime-evaluated
arguments.
After this change, the pattern recognition expressions
are processed by rewrite-based optimizations such as
SimplifyExpressions or Desugar... transformations.

The change applies to:
- top-level pattern recognition expressions
- aggregation arguments in pattern recognition context
@kasiafi kasiafi force-pushed the 263PatternAggregations branch from 93f05e0 to 42c96aa Compare December 3, 2021 18:02
@kasiafi kasiafi merged commit f2b0a4c into trinodb:master Dec 3, 2021
@kasiafi kasiafi mentioned this pull request Dec 3, 2021
12 tasks
@github-actions github-actions bot added this to the 365 milestone Dec 3, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

3 participants