Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-4421][VL] Disable flushable aggregate when input is already partitioned by grouping keys #4443

Merged
merged 9 commits into from
Jan 19, 2024

Conversation

zhztheplayer
Copy link
Member

@zhztheplayer zhztheplayer commented Jan 18, 2024

If child output already partitioned by aggregation keys (this function returns true), we should avoid the optimization converting to flushable aggregation.

For example, if input is hash-partitioned by keys (a, b) and aggregate node requests "group by a, b, c", then the aggregate should NOT flush as the grouping set (a, b, c) will be created only on a single partition among the whole cluster. Spark's planner may use this information to perform optimizations like doing "partial_count(a, b, c)" directly on the output data.

This fixes #4421

@zhztheplayer
Copy link
Member Author

/Benchmark Velox

Copy link

#4421

Copy link

Run Gluten Clickhouse CI

Comment on lines +172 to +181
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.3 Q38 flush
run: |
$PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh 'cd /opt/gluten/tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=ds --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 --queries=q38 \
--disable-bhj \
--extra-conf=spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=0.1 \
--extra-conf=spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=0.2 \
--extra-conf=spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100 \
--extra-conf=spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0'
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Later we may develop a new way to arrange these gluten-it CI jobs. The yaml file size is exploding.

Copy link

Run Gluten Clickhouse CI

@zhztheplayer
Copy link
Member Author

/Benchmark Velox

@zhztheplayer zhztheplayer marked this pull request as ready for review January 18, 2024 08:55
@GlutenPerfBot
Copy link
Contributor

===== Performance report for TPCH SF2000 with Velox backend, for reference only ====

query log/native_4443_time.csv log/native_master_01_17_2024_6e070aee2_time.csv difference percentage
q1 33.21 32.53 -0.678 97.96%
q2 24.14 25.15 1.012 104.19%
q3 38.40 35.63 -2.766 92.80%
q4 36.51 39.47 2.965 108.12%
q5 70.64 69.91 -0.726 98.97%
q6 8.04 7.16 -0.880 89.06%
q7 84.82 83.43 -1.397 98.35%
q8 84.89 86.98 2.089 102.46%
q9 121.60 125.57 3.971 103.27%
q10 44.75 42.09 -2.663 94.05%
q11 20.27 20.23 -0.045 99.78%
q12 24.96 27.47 2.508 110.05%
q13 45.98 44.85 -1.136 97.53%
q14 20.99 17.86 -3.134 85.07%
q15 28.30 29.77 1.468 105.19%
q16 14.18 13.95 -0.229 98.39%
q17 101.66 100.92 -0.735 99.28%
q18 148.63 146.37 -2.265 98.48%
q19 13.02 13.91 0.897 106.89%
q20 26.54 26.50 -0.036 99.87%
q21 224.71 226.25 1.543 100.69%
q22 13.57 13.71 0.135 101.00%
total 1229.82 1229.72 -0.100 99.99%

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

@zhztheplayer
Copy link
Member Author

/Benchmark Velox

Copy link

Run Gluten Clickhouse CI

@ulysses-you
Copy link
Contributor

I'm trying to understand this issue, please correct me if wrong. What we want to fix in this pr is: do not convert regular to flushable agg (2) because the flushable agg would output more than one group for the same group and then cause the partial count accumulator (3) bigger than expected.

SELECT c1, count(distinct c2), count(1) FROM t GROUP BY c1

(4) Aggregate [c1], [count(c2), count(1)]
  Shuffle c1
    (3) Aggregate [c1], [partial_count(c2), merge_count(1)]
      (2) Aggregate [c1, c2], [merge_count(1)]
         Shuffle c1, c2
           (1) Aggregate [c1, c2], [partial_count(1)]

@GlutenPerfBot
Copy link
Contributor

===== Performance report for TPCH SF2000 with Velox backend, for reference only ====

query log/native_4443_time.csv log/native_master_01_18_2024_7c853c61f_time.csv difference percentage
q1 33.23 31.92 -1.309 96.06%
q2 23.69 25.94 2.252 109.50%
q3 34.74 36.00 1.261 103.63%
q4 38.31 39.52 1.205 103.14%
q5 66.61 68.31 1.700 102.55%
q6 6.99 5.33 -1.657 76.29%
q7 84.30 83.20 -1.104 98.69%
q8 83.98 84.62 0.638 100.76%
q9 122.96 123.96 1.000 100.81%
q10 41.48 42.82 1.336 103.22%
q11 19.90 20.16 0.262 101.32%
q12 28.66 26.88 -1.780 93.79%
q13 45.68 44.35 -1.330 97.09%
q14 17.00 16.22 -0.775 95.44%
q15 28.44 28.70 0.262 100.92%
q16 14.07 13.92 -0.144 98.98%
q17 98.73 99.40 0.673 100.68%
q18 147.12 145.71 -1.408 99.04%
q19 12.52 12.60 0.084 100.67%
q20 26.00 26.33 0.326 101.26%
q21 223.06 225.76 2.703 101.21%
q22 13.64 13.59 -0.056 99.59%
total 1211.10 1215.24 4.141 100.34%

@zhztheplayer
Copy link
Member Author

I'm trying to understand this issue, please correct me if wrong. What we want to fix in this pr is: do not convert regular to flushable agg (2) because the flushable agg would output more than one group for the same group and then cause the partial count accumulator (3) bigger than expected.

SELECT c1, count(distinct c2), count(1) FROM t GROUP BY c1

(4) Aggregate [c1], [count(c2), count(1)]
  Shuffle c1
    (3) Aggregate [c1], [partial_count(c2), merge_count(1)]
      (2) Aggregate [c1, c2], [merge_count(1)]
         Shuffle c1, c2
           (1) Aggregate [c1, c2], [partial_count(1)]

It's more or less similar to the issue this PR is trying to solve. Except that Q38 generates plan like the following:

// Spark 3.3
(2) Aggregate [partial_count]
 Shuffle ???
  (1) Aggregate [c1, c2]  // <- this should not be flushable
   Shuffle c1, c2

In Spark 3.2 the agg (1) can be flushable since there was another distinct aggregation generated on reducer side:

// Spark 3.2
(2) Aggregate [partial_count]
  (3) Aggregate [c1, c2]
    Shuffle ???
     (1) Aggregate [c1, c2]  // <- this can be flushable
       Shuffle c1, c2

That's why the issue is found starting from Spark 3.3. There might be some new optimizations from vanilla Spark.

@zhztheplayer
Copy link
Member Author

I'm trying to understand this issue, please correct me if wrong. What we want to fix in this pr is: do not convert regular to flushable agg (2) because the flushable agg would output more than one group for the same group and then cause the partial count accumulator (3) bigger than expected.

SELECT c1, count(distinct c2), count(1) FROM t GROUP BY c1

(4) Aggregate [c1], [count(c2), count(1)]
  Shuffle c1
    (3) Aggregate [c1], [partial_count(c2), merge_count(1)]
      (2) Aggregate [c1, c2], [merge_count(1)]
         Shuffle c1, c2
           (1) Aggregate [c1, c2], [partial_count(1)]

BTW, forgot to mention that current code should already be able to handle this case without this patch (we convert agg to flushable agg only when it's the one close to shuffle). But still thanks for taking the example which is valuable anyway.

@ulysses-you
Copy link
Contributor

The optimization since Spark3.3 is due to the pr apache/spark#35779. So we should not convert regular to flushable agg if it is a group by only aggreagate and it's adjacent parent is a partial aggregate ?

@zhztheplayer
Copy link
Member Author

The optimization since Spark3.3 is due to the pr apache/spark#35779. So we should not convert regular to flushable agg if it is a group by only aggreagate and it's adjacent parent is a partial aggregate ?

Thanks for the information.

I think we indeed could forbid flushing on the case apache/spark#35779 optimizes against, although I am thinking whether the patch could provide a more general fix.

When an aggregate can be considered to emit distinct data and so propagate "distinct attributes", the distinct aggregation must be a "final distinct aggregation", which means it has to process data that are already partitioned by the distinct keys. Base on this assumption, the patch could be a correct fix (Correct me if I was wrong, indeed).

Additionally, the fix could be considered "general" since it's not limited to distinct aggregation. For example, a partial sum agg could produce meaningful data for a specific grouping set when it is handling input that was already partitioned by grouping keys. This may not be a good example since I doubt Catalyst planner never creates plan related to this case, but anyway the principle here is to be more careful to use flushable aggregation since vanilla Spark doesn't have this kind of optimization as of now.

Copy link

Run Gluten Clickhouse CI

@ulysses-you
Copy link
Contributor

This pr is a kind of conservative fix for the issue, that means it can fix the issue but may miss optimize some other cases, e.g., if the agg is on the top of a shuffled join with the same keys, then the partial agg would not be converted to flushable.

xxx
Shuffle
  Aggregate xxx
    Aggregate partial_xxx
      Shuffled Join
         Shuffle
         Shuffle

I'm fine to fix it first since it's a data correctness issue, and do further optimization in next pr.

@zhztheplayer
Copy link
Member Author

This pr is a kind of conservative fix for the issue, that means it can fix the issue but may miss optimize some other cases, e.g., if the agg is on the top of a shuffled join with the same keys, then the partial agg would not be converted to flushable.

xxx
Shuffle
  Aggregate xxx
    Aggregate partial_xxx
      Shuffled Join
         Shuffle
         Shuffle

I'm fine to fix it first since it's a data correctness issue, and do further optimization in next pr.

I understand your point. And I think this kind of plan is not able to be optimized within flushable agg even without the patch. So let's keep enhancing the rule to cover more cases like that in further development iterations.

Copy link
Contributor

@ulysses-you ulysses-you left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for the fix

@zhztheplayer zhztheplayer merged commit d1e6ead into apache:main Jan 19, 2024
19 checks passed
@zhztheplayer
Copy link
Member Author

Thanks for reviewing!

@GlutenPerfBot
Copy link
Contributor

===== Performance report for TPCH SF2000 with Velox backend, for reference only ====

query log/native_4443_time.csv log/native_master_01_18_2024_7c853c61f_time.csv difference percentage
q1 33.16 31.92 -1.235 96.28%
q2 25.53 25.94 0.409 101.60%
q3 36.16 36.00 -0.155 99.57%
q4 37.77 39.52 1.749 104.63%
q5 69.34 68.31 -1.033 98.51%
q6 5.52 5.33 -0.186 96.64%
q7 84.29 83.20 -1.094 98.70%
q8 84.85 84.62 -0.233 99.73%
q9 121.23 123.96 2.728 102.25%
q10 42.28 42.82 0.539 101.28%
q11 19.58 20.16 0.581 102.97%
q12 27.09 26.88 -0.218 99.19%
q13 47.41 44.35 -3.055 93.56%
q14 16.36 16.22 -0.137 99.16%
q15 27.38 28.70 1.318 104.81%
q16 13.15 13.92 0.776 105.90%
q17 99.03 99.40 0.369 100.37%
q18 147.03 145.71 -1.320 99.10%
q19 12.49 12.60 0.112 100.90%
q20 26.60 26.33 -0.275 98.97%
q21 226.20 225.76 -0.443 99.80%
q22 13.73 13.59 -0.143 98.96%
total 1216.18 1215.24 -0.944 99.92%

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[VL] Flushable distinct agg caused correctness issue
3 participants