Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Improve count aggregate performance #784

Merged
merged 10 commits into from
Aug 6, 2024

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Aug 6, 2024

Which issue does this PR close?

Closes #744

Rationale for this change

For some reason, COUNT is really slow when used from Comet, but SUM is fast, so let's translate COUNT(expr) to SUM(IF(expr IS NULL, 0, 1)) until we can get to the bottom of the real issue.

edit: It turns out that Spark also implements COUNT this way, so I think this closes the issue.

Grouped HashAgg Exec: single group key (cardinality 1048576), single aggregate COUNT:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (COUNT)                                                                    1716           1728          17          6.1         163.7       1.0X
SQL Parquet - Comet (Scan) (COUNT)                                                             1677           1680           4          6.3         159.9       1.0X
SQL Parquet - Comet (Scan, Exec) (COUNT)                                                        782            800          27         13.4          74.6       2.2X

What changes are included in this PR?

How are these changes tested?

@andygrove andygrove changed the title experiment: workaround for count aggregate performance issue perf: workaround for count aggregate performance issue Aug 6, 2024
@andygrove andygrove marked this pull request as ready for review August 6, 2024 01:46
@andygrove
Copy link
Member Author

andygrove commented Aug 6, 2024

This PR results in an 11% speedup overall for TPC-DS @ 100 GB (based on a single run). Runs do vary, so this may be exaggerating the speedup. I will run with more iterations and post more results.

edit: this was run with comet.debug.enabled by mistake

tpcds_allqueries

@andygrove
Copy link
Member Author

andygrove commented Aug 6, 2024

Average of 3 runs, main branch versus this PR. This shows a 15.5% speedup.

tpcds_allqueries

Command used for both runs:

$SPARK_HOME/bin/spark-submit \
    --master $SPARK_MASTER \
    --conf spark.driver.memory=8G \
    --conf spark.executor.instances=1 \
    --conf spark.executor.memory=32G \
    --conf spark.executor.cores=8 \
    --conf spark.cores.max=8 \
    --conf spark.eventLog.enabled=true \
    --jars $COMET_JAR \
    --conf spark.driver.extraClassPath=$COMET_JAR \
    --conf spark.executor.extraClassPath=$COMET_JAR \
    --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
    --conf spark.comet.enabled=true \
    --conf spark.comet.exec.enabled=true \
    --conf spark.comet.exec.all.enabled=true \
    --conf spark.comet.cast.allowIncompatible=true \
    --conf spark.comet.shuffle.enforceMode.enabled=true \
    --conf spark.comet.exec.shuffle.enabled=true \
    --conf spark.comet.exec.shuffle.mode=auto \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
    tpcbench.py \
    --benchmark tpcds \
    --data /mnt/bigdata/tpcds/sf100/ \
    --queries ../../tpcds/queries-spark \
    --iterations 3

@andygrove andygrove requested review from huaxingao and viirya August 6, 2024 04:58
Copy link
Contributor

@huaxingao huaxingao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the PR @andygrove

.iter()
.map(|child| self.create_expr(child, schema.clone()))
.collect::<Result<Vec<_>, _>>()?;
if expr.children.iter().len() == 1 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think we can also do this for multiple child expressions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I have extended this approach for the multiple argument case.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks okay. Actually it is how Spark count does internally:

 /* count = */ If(nullableChildren.map(IsNull).reduce(Or), count, count + 1L)

@andygrove andygrove changed the title perf: workaround for count aggregate performance issue perf: Improve count aggregate performance Aug 6, 2024
@andygrove andygrove merged commit d9dc117 into apache:main Aug 6, 2024
76 checks passed
@andygrove andygrove deleted the count-perf-workaround branch August 6, 2024 13:47
himadripal pushed a commit to himadripal/datafusion-comet that referenced this pull request Sep 7, 2024
* Workaround for COUNT performance

* add comments

* remove benchmark results

* fix regression

* revert change to datafusion version

* Revert change to Cargo.lock

* fix

* unify code for single and multiple arguments

* clippy
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Improve performance of COUNT aggregates
3 participants