Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incorrect plan property derivations #15154

Conversation

pettyjamesm
Copy link
Member

@pettyjamesm pettyjamesm commented Nov 22, 2022

Description

Fix a correctness issue associated with local exchange properties incorrectly propagating non-constant local properties inappropriately.

Example Query:

WITH students_results(student_id, course_id, grade) AS (VALUES
    (1000, 100, 17),
    (2000, 200, 16),
    (3000, 300, 18),
    (1000, 100, 18),
    (2000, 100, 10),
    (3000, 200, 20),
    (1000, 200, 16),
    (2000, 300, 12),
    (3000, 100, 17),
    (2000, 200, 15),
    (3000, 100, 18),
    (1000, 300, 12),
    (3000, 100, 20),
    (1000, 300, 16),
    (2000, 100, 12))
SELECT student_id, course_id, cnt, avg_w_sum,
    avg(sum_w_sum) OVER (
        ORDER BY student_id, course_id
        ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING
    ) AS avg_w
FROM (
    SELECT
        student_id, course_id, count(*) AS cnt,
        sum(sum(grade)) OVER (
            ORDER BY student_id, course_id
            ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING
        ) AS avg_w_sum,
        sum(sum(grade)) OVER (
            PARTITION BY student_id
        ) AS sum_w_sum
    FROM students_results
    GROUP BY student_id, course_id
) AS t
ORDER BY student_id, course_id

Before this fix, trino would produce incorrect (but not deterministic) results like the following:

 student_id | course_id | cnt | avg_w_sum | avg_w
------------+-----------+-----+-----------+-------
       2000 |       100 |   2 |        97 |  65.0
       2000 |       200 |   2 |        93 |  65.0
       2000 |       300 |   1 |       120 |  68.5
       1000 |       100 |   2 |        51 |  72.0
       1000 |       200 |   1 |        79 |  75.5
       1000 |       300 |   2 |       101 |  82.5
       3000 |       100 |   3 |       118 |  86.0
       3000 |       200 |   1 |       105 |  89.5
       3000 |       300 |   1 |        93 |  93.0

Instead of the expected correct output:

 student_id | course_id | cnt | avg_w_sum | avg_w
------------+-----------+-----+-----------+-------
       1000 |       100 |   2 |        51 |  79.0
       1000 |       200 |   1 |        79 |  79.0
       1000 |       300 |   2 |       101 |  75.5
       2000 |       100 |   2 |        97 |  72.0
       2000 |       200 |   2 |        93 |  68.5
       2000 |       300 |   1 |       120 |  72.0
       3000 |       100 |   3 |       118 |  79.0
       3000 |       200 |   1 |       105 |  86.0
       3000 |       300 |   1 |        93 |  93.0

Note that in this example, the query results are in the incorrect order according to the top-level ORDER BY and the window function results are incorrect because the WindowNode in the plan was constructed with a preSortedOrderPrefix=2 essentially declaring that the input was pre-sorted by student_id, course_id which was not. As a result, the WindowOperator does not think it needs to sort its output (causing the incorrect row order in the results) and attempts to find window frame boundaries in its input (causing incorrect window function results).

This bug was caused by commit f57b8a4 introduced as part of #6634 and released in Trino 358, which incorrectly propagated sort and partitioning local properties across the exchange boundary.

As @martint is correctly noted below, ActualProperties.isEffectivelySingleStream() does not mean is actually single stream (and is probably poorly named). In the case of a gathering exchange, if multiple sub-streams each produce data in sort order before the local exchange, the rows will not be emitted from the exchange preserving that order unless that exchange is a "merging" exchange (which in this case it is not, because of the ExchangeNode#getOrderingScheme().isEmpty()) check). Only "constant" local properties can safely and correctly be be propagated across local exchanges like this.

Release notes

( ) This is not user-visible or docs only and no release notes are required.
(x) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text:

I'm not sure exactly how to characterize this for the purposes of release notes since the scope of the potential impact of this bug on query correctness is potentially very broad. Suggestions for the release note welcome.

# General
* Fix a correctness bug introduced in Trino 358 by ({issue}`6634`)

@martint
Copy link
Member

martint commented Nov 22, 2022

The underlying cause is that a local gathering exchange (non-merging) does not preserve grouping and sorting properties, only constants.

@martint
Copy link
Member

martint commented Nov 23, 2022

Can you add a test with the query you were showing me earlier today?

@pettyjamesm
Copy link
Member Author

Yep, need to try to understand if there’s a more minimized version that can be made and what assertions need to be made on that version to make it a little less of a regression test scenario.

@raunaqmorarka
Copy link
Member

The underlying cause is that a local gathering exchange (non-merging) does not preserve grouping and sorting properties, only constants.

There is also the inputProperty.isEffectivelySingleStream() condition. Why wouldn't sorting and grouping be preserved when the input data is single stream ?

Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per @raunaqmorarka , we already have isEffectivelySingleStream check, so I'm not sure why there would be a problem here

@@ -661,10 +661,11 @@ public ActualProperties visitExchange(ExchangeNode node, List<ActualProperties>
if (inputProperty.isEffectivelySingleStream() && node.getOrderingScheme().isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please improve commit message with description what was wrong and example preferrably

@martint
Copy link
Member

martint commented Nov 23, 2022

isEffectivelySingleStream does not do what you think it does. It just checks whether the stream is unpartitioned. In the case of a local gathering exchange, the input might not be partitioned on a column, but the engine might schedule multiple drivers for that pipeline that feed into the exchange (effectively, partitioned on an arbitrary function) if task parallelism is > 1 .

@sopel39
Copy link
Member

sopel39 commented Nov 23, 2022

It just checks whether the stream is unpartitioned

@martint The check for isEffectivelySingleStream is: streamPartitioning.isPresent() && streamPartitioning.get().isEffectivelySinglePartition(constants) && !nullsAndAnyReplicated.

The stream:

  1. must be partitioned
  2. there must be a single partition effectively
  3. no replication

Hence any property of that single stream should be held across exchange.

@pettyjamesm
Copy link
Member Author

I've updated the PR description with more details including an example query and the previous incorrect results.

@sopel39 - your assumptions about isEffectivelySingleStream() requiring that the stream is partitioned is not correct. Unpartitioned streams or streams partitioned only on constants will return true.

@pettyjamesm
Copy link
Member Author

pettyjamesm commented Nov 23, 2022

@raunaqmorarka - This fix triggers a regression test added by #8535 which suggests there may be other bugs introduced by #6634. I may need some help root causing the test failures to identify the necessary related fix(es). Seems like the same issue you mentioned in this comment.

@sopel39
Copy link
Member

sopel39 commented Nov 23, 2022

Unpartitioned streams or streams partitioned only on constants will return true.

Maybe that's true, but even method javadoc suggest otherwise:

        /**
         * @return true if all the data will effectively land in a single stream
         */

Could you describe why for unpartitioned stream this method will return true?

@pettyjamesm
Copy link
Member Author

Could you describe why for unpartitioned stream this method will return true?

Because ActualProperties.isEffectivelySingleStream() delegates to ActualProperties.Global.isEffectivelySingleStream(constants) which delegates to Partitioning.isEffectivelySinglePartition(constants) which calls
Partitioning.isPartitionedOn(ImmutableList.of(), constants).

The last two methods in particular should make it obvious what will happen for streams partitioned on constants or not partitioned at all, and none of the logic guarantees that there will actually be only a single stream.

@pettyjamesm pettyjamesm force-pushed the fix-incorrect-local-exchange-properties branch from da8b9bb to 4ef4e65 Compare November 23, 2022 17:47
@pettyjamesm
Copy link
Member Author

I've pushed a new commit. In addition to the initial LocalExchange property derivation fix, there was also a bug that would attach sort properties on WindowNode instances based on their orderBy specification. WindowOperator does not emit output rows in the order of it's internal orderBy specification.

This bug has been present in Presto/Trino since Presto version 0.101, introduced in commit c5a3bfc - but became much more likely to cause correctness issues after the changes introduced in #6634

@pettyjamesm pettyjamesm changed the title Fix property derivations on local exchanges Fix incorrect plan property derivations Nov 23, 2022
Window nodes do not emit output rows based on their orderBy
specificiation and therefore should not attach a sorting property
as party of PropertyDerivations.
LocalExchanges should not propagate grouping or ordering local
properties from one side to another.
@pettyjamesm pettyjamesm force-pushed the fix-incorrect-local-exchange-properties branch from 4ef4e65 to eff37ff Compare November 23, 2022 18:31
@pettyjamesm
Copy link
Member Author

@martint / @raunaqmorarka - After fixing invalid WindowNode sorting properties, tests fail in TestEliminateSorts. The new plan is valid, but seems much worse than a simple TopN sequence. See plan output details here: https://github.com/trinodb/trino/actions/runs/3534747180/jobs/5932413709

@martint
Copy link
Member

martint commented Nov 24, 2022

There are still a few issues we need to sort out with this fix, so my suggestion for the short term would be to add a config/session property to disable the optimization added in #6634 and turn off the optimization by default.

@martint
Copy link
Member

martint commented Nov 24, 2022

The properties for WindowNode are a little more complicated. Currently, it guarantees that the output will be sorted by the partition-by + order-by columns. However, if the input is pre-grouped, it will be only grouped by the pre-grouped columns and sorted by the remaining partition-by + order by columns.

@sopel39
Copy link
Member

sopel39 commented Nov 24, 2022

The last two methods in particular should make it obvious what will happen for streams partitioned on constants or not partitioned at all, and none of the logic guarantees that there will actually be only a single stream.

@pettyjamesm
So for unpartitioned stream there will be streamPartitioning.isPresent()?

I still don't see a problem with io.trino.sql.planner.Partitioning#isEffectivelySinglePartition method. Even when partitioning doesn't have any arguments, it means data should be streamed by single stream (essentially a constant partitioning), so the method correctly returns true

@@ -661,10 +659,11 @@ public ActualProperties visitExchange(ExchangeNode node, List<ActualProperties>
if (inputProperty.isEffectivelySingleStream() && node.getOrderingScheme().isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think isEffectivelySingleStream() might be incorrectly returning true here.

raunaqmorarka added a commit to raunaqmorarka/presto that referenced this pull request Nov 24, 2022
This optimization can cause a correctness issue identified in
trinodb#15154
Attempting to fix that leads to other resolved issues around properties
of WindowNode.
So this optimization is being disabled by default until we resolve all
the known issues related to it.
@pettyjamesm
Copy link
Member Author

@pettyjamesm So for unpartitioned stream there will be streamPartitioning.isPresent()?

Yes, it can be present.

I still don't see a problem with io.trino.sql.planner.Partitioning#isEffectivelySinglePartition method. Even when partitioning doesn't have any arguments, it means data should be streamed by single stream (essentially a constant partitioning), so the method correctly returns true

Unpartitioned streams are allowed to be parallelized beyond a single stream, in the case of this particular reproducing query- by fixed arbitrary distribution (round robin) local exchanges between gathering exchange sequences.

@sopel39
Copy link
Member

sopel39 commented Nov 24, 2022

Unpartitioned streams are allowed to be parallelized beyond a single stream, in the case of this particular reproducing query- by fixed arbitrary distribution (round robin) local exchanges between gathering exchange sequences.

Seems like we really should fix isEffectivelySinglePartition to not return true on fixed arbitrary distribution or scaled writer round robin distribution

@pettyjamesm
Copy link
Member Author

Seems like we really should fix isEffectivelySinglePartition to not return true on fixed arbitrary distribution or scaled writer round robin distribution

I agree that the method name is very misleading, at least from the perspective of someone not familiar with this part of the code base (including me). Maybe the notion of “effectively single stream” is useful in some other context than “actually single stream”, but if so- it probably deserves a different name that makes it more clear.

@sopel39
Copy link
Member

sopel39 commented Nov 25, 2022

Here is my fix proposal (#15194) that fixes underlying issue with isEffectivelySinglePartition

@sopel39
Copy link
Member

sopel39 commented Nov 28, 2022

Fixed via #15203

@sopel39 sopel39 closed this Nov 28, 2022
@pettyjamesm pettyjamesm deleted the fix-incorrect-local-exchange-properties branch March 24, 2023 18:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

4 participants