Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1TB TPCDS benchmark over Vanilla Spark, suffer performance slowdown #588

Closed
DamonZhao-sfu opened this issue Jun 19, 2024 · 13 comments
Closed
Assignees
Labels
enhancement New feature or request

Comments

@DamonZhao-sfu
Copy link

DamonZhao-sfu commented Jun 19, 2024

What is the problem the feature request solves?

I'm running the 1TB TPCDS benchmark over Comet and Vanilla Spark.
I'm running on a 48Core 186G RAM machine
Here's my config:

/localhdd/hza214/spark-3.4/spark-3.4.2-bin-hadoop3/bin/spark-shell \
    --jars $COMET_JAR \
    --conf spark.driver.extraClassPath=$COMET_JAR \
    --conf spark.executor.extraClassPath=$COMET_JAR \
    --conf spark.comet.batchSize=8192 \
    --conf spark.sql.autoBroadcastJoinThreshold=-1 \
    --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
    --conf spark.comet.enabled=true \
    --conf spark.comet.exec.enabled=true \
    --conf spark.comet.exec.all.enabled=true \
    --conf spark.comet.cast.allowIncompatible=true \
    --conf spark.comet.explainFallback.enabled=true\
    --conf spark.comet.parquet.io.enabled=false \
    --conf spark.comet.batchSize=8192 \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=50g \
    --conf spark.shuffle.file.buffer=128k\
    --conf spark.local.dir=/mnt/smartssd_0n/hza214/sparktmp \
    --num-executors 48 \
    --executor-cores 48 \
    --driver-memory 20g \
    --executor-memory 140g \

image

Has the open-source community run TPCDS benchmark before on Comet? I think the project is still in an relatively early stage, so many native operators are not supported yet. So does it meet the expectation?

Describe the potential solution

No response

Additional context

No response

@DamonZhao-sfu DamonZhao-sfu added the enhancement New feature or request label Jun 19, 2024
@viirya
Copy link
Member

viirya commented Jun 20, 2024

Please follow the benchmark guide https://datafusion.apache.org/comet/contributor-guide/benchmarking.html to set up proper Comet configs. With your configs, most native operators wouldn't be enabled.

@viirya
Copy link
Member

viirya commented Jun 20, 2024

@andygrove created a repo https://github.com/apache/datafusion-benchmarks including scripts used to benchmark Comet. You can also follow the steps.

@andygrove
Copy link
Member

Hi @DamonZhao-sfu we are working on some items in the 0.1.0 milestone that will likely help, particularly #591 and #387

I am not personally planning on spending much time on TPC-DS until we have some of these issues resolved.

@DamonZhao-sfu
Copy link
Author

Hi @DamonZhao-sfu we are working on some items in the 0.1.0 milestone that will likely help, particularly #591 and #387

I am not personally planning on spending much time on TPC-DS until we have some of these issues resolved.

Thank you for your reply. I will try when these features are merged. When I follow https://datafusion.apache.org/comet/contributor-guide/benchmarking.html the benchmark guide and set the same configuration, I still discover many aggr/join operators are not supported natively, and they run much slower than vanilla spark. I'm currently writing a paper on benchmarking different spark native engine, it seems that the comet community is currently focusing on TPCH optimization? Will more native operator in TPCDS be supported in the future?

@andygrove
Copy link
Member

Thank you for your reply. I will try when these features are merged. When I follow https://datafusion.apache.org/comet/contributor-guide/benchmarking.html the benchmark guide and set the same configuration, I still discover many aggr/join operators are not supported natively, and they run much slower than vanilla spark. I'm currently writing a paper on benchmarking different spark native engine, it seems that the comet community is currently focusing on TPCH optimization? Will more native operator in TPCDS be supported in the future?

Yes, we are aiming for full TPC-DS support. We are just starting with TPC-H because it is easier for contributors to get up and running with that benchmark and is good enough to highlight some current limitations.

@andygrove
Copy link
Member

Hi @DamonZhao-sfu. For query 72, are you enabling CBO in Spark or using any form of join reordering or are you using the official version of the query that joins catalog_sales to inventory first? I am asking because your times for q72 (in both Spark and Comet) are faster than I am seeing locally.

@andygrove
Copy link
Member

@DamonZhao-sfu could you also provide the configs you used for the Spark run? I am seeing most queries running faster with Comet (but at 100GB) and would like to try and reproduce your results.

tpch_queries_speedup

@DamonZhao-sfu
Copy link
Author

DamonZhao-sfu commented Jul 19, 2024

Hi @DamonZhao-sfu. For query 72, are you enabling CBO in Spark or using any form of join reordering or are you using the official version of the query that joins catalog_sales to inventory first? I am asking because your times for q72 (in both Spark and Comet) are faster than I am seeing locally.

No, i did not enable CBO and join reorder. I'm using the official version. here's my sql:

select  i_item_desc
      ,w_warehouse_name
      ,d1.d_week_seq
      ,sum(case when p_promo_sk is null then 1 else 0 end) no_promo
      ,sum(case when p_promo_sk is not null then 1 else 0 end) promo
      ,count(*) total_cnt
from catalog_sales
join inventory on (cs_item_sk = inv_item_sk)
join warehouse on (w_warehouse_sk=inv_warehouse_sk)
join item on (i_item_sk = cs_item_sk)
join customer_demographics on (cs_bill_cdemo_sk = cd_demo_sk)
join household_demographics on (cs_bill_hdemo_sk = hd_demo_sk)
join date_dim d1 on (cs_sold_date_sk = d1.d_date_sk)
join date_dim d2 on (inv_date_sk = d2.d_date_sk)
join date_dim d3 on (cs_ship_date_sk = d3.d_date_sk)
left outer join promotion on (cs_promo_sk=p_promo_sk)
left outer join catalog_returns on (cr_item_sk = cs_item_sk and cr_order_number = cs_order_number)
where d1.d_week_seq = d2.d_week_seq
  and inv_quantity_on_hand < cs_quantity 
  and d3.d_date > d1.d_date + 5
  and hd_buy_potential = '1001-5000'
  and d1.d_year = 2001
  and cd_marital_status = 'M'
group by i_item_desc,w_warehouse_name,d1.d_week_seq
order by total_cnt desc, i_item_desc, w_warehouse_name, d_week_seq
 LIMIT 100 ;


@DamonZhao-sfu
Copy link
Author

DamonZhao-sfu commented Jul 19, 2024

@DamonZhao-sfu could you also provide the configs you used for the Spark run? I am seeing most queries running faster with Comet (but at 100GB) and would like to try and reproduce your results.

tpch_queries_speedup

here's my config:

export COMET_JAR=/localhdd/hza214/datafusion-comet/spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar
export SPARK_LOCAL_DIRS=/mnt/smartssd_0n/hza214/sparktmp
INFLUXDB_ENDPOINT=`hostname`
cat tpcds_parquet.scala | /localhdd/hza214/spark-3.4/spark-3.4.2-bin-hadoop3/bin/spark-shell \
    --jars $COMET_JAR \
    --conf spark.comet.xxhash64.enabled=true\
    --conf spark.driver.extraClassPath=$COMET_JAR \
    --conf spark.executor.extraClassPath=$COMET_JAR \
    --conf spark.comet.batchSize=8192 \
    --conf spark.sql.autoBroadcastJoinThreshold=-1\
    --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
    --conf spark.comet.enabled=true \
    --conf spark.comet.exec.enabled=true \
    --conf spark.comet.exec.all.enabled=true \
    --conf spark.comet.parquet.io.enabled=false \
    --conf spark.comet.cast.allowIncompatible=true \
    --conf spark.comet.explainFallback.enabled=true\
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.sql.adaptive.coalescePartitions.enabled=false\
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager\
    --conf spark.comet.exec.shuffle.enabled=true\
    --conf spark.comet.exec.shuffle.mode=native\
    --conf spark.memory.offHeap.size=50g \
    --conf spark.shuffle.file.buffer=128k\
    --conf spark.local.dir=/mnt/smartssd_0n/hza214/sparktmp \
    --executor-cores 48 \
    --driver-memory 10g \
    --executor-memory 140g \

But later I was advised by others that I should set a lower executor core to let more executors running in parallel in one node.
I'm using 4 node clusters each with 48 core, 196GB memory and ssd as localfile disk. I have not tested with 100GB sizes yet. Let me reproduce it. Also, would you like to share your spark and server configs? @andygrove

@andygrove
Copy link
Member

Thanks @DamonZhao-sfu. We just updated our benchmarking guide with the currently recommended configs for the latest Comet code. Could you build with latest code and try with these settings?

Here are the results we are seeing for TPC-DS. They are not very impressive yet, but we do see a small speedup with Comet.

https://datafusion.apache.org/comet/contributor-guide/benchmarking.html#tpc-ds

@andygrove andygrove self-assigned this Jul 19, 2024
@DamonZhao-sfu
Copy link
Author

Hi @DamonZhao-sfu. For query 72, are you enabling CBO in Spark or using any form of join reordering or are you using the official version of the query that joins catalog_sales to inventory first? I am asking because your times for q72 (in both Spark and Comet) are faster than I am seeing locally.

No, i did not enable CBO and join reorder. I'm using the official version. here's my sql:

select  i_item_desc
      ,w_warehouse_name
      ,d1.d_week_seq
      ,sum(case when p_promo_sk is null then 1 else 0 end) no_promo
      ,sum(case when p_promo_sk is not null then 1 else 0 end) promo
      ,count(*) total_cnt
from catalog_sales
join inventory on (cs_item_sk = inv_item_sk)
join warehouse on (w_warehouse_sk=inv_warehouse_sk)
join item on (i_item_sk = cs_item_sk)
join customer_demographics on (cs_bill_cdemo_sk = cd_demo_sk)
join household_demographics on (cs_bill_hdemo_sk = hd_demo_sk)
join date_dim d1 on (cs_sold_date_sk = d1.d_date_sk)
join date_dim d2 on (inv_date_sk = d2.d_date_sk)
join date_dim d3 on (cs_ship_date_sk = d3.d_date_sk)
left outer join promotion on (cs_promo_sk=p_promo_sk)
left outer join catalog_returns on (cr_item_sk = cs_item_sk and cr_order_number = cs_order_number)
where d1.d_week_seq = d2.d_week_seq
  and inv_quantity_on_hand < cs_quantity 
  and d3.d_date > d1.d_date + 5
  and hd_buy_potential = '1001-5000'
  and d1.d_year = 2001
  and cd_marital_status = 'M'
group by i_item_desc,w_warehouse_name,d1.d_week_seq
order by total_cnt desc, i_item_desc, w_warehouse_name, d_week_seq
 LIMIT 100 ;

@andygrove could you provide the optimized join order version of q72? Thanks!

@andygrove
Copy link
Member

@DamonZhao-sfu We just released Comet 0.2.0 which provides some performance improvements for TPC-DS. We are now starting to test with 1TB data set as well.

https://datafusion.apache.org/blog/2024/08/28/datafusion-comet-0.2.0/

@andygrove
Copy link
Member

This issue is from before the 0.1.0 release and there have been some improvements to TPC-DS since then. We have an epic for improving TPC-DS so I think we can close this issue now.

#858

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants