Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Various metrics bug fixes and improvements #1111

Merged
merged 25 commits into from
Dec 2, 2024

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Nov 22, 2024

Which issue does this PR close?

Closes #1109
Closes #1003
Closes #1110
Closes #935

Rationale for this change

We currently drop some native metrics due to a design flaw in the current metrics code where we assume that the native plan is a 1:1 mapping with the Spark plan, which is often not true. See the issue for more details.

Improvement 1: Fix bug where metrics were being dropped in some cases

Here are before and after images for BulidRight hash join where we insert an extra projection on the native side, breaking the assumption that there is a 1:1 mapping between Spark plan and native plan:

Screenshot from 2024-11-22 11-44-21

Screenshot from 2024-11-22 14-21-34

Improvement 2: Report Arrow FFI time for passing batches from JVM to native

We now include the ScanExec time for transferring batches from JVM to native. The following example shows total scan time of 16.4 seconds but now also shows the additional 17.7 seconds for transferring those batches to native for the filter operation.

Screenshot from 2024-11-24 08-21-41

What changes are included in this PR?

The native planner now builds a tree of SparkPlan that is a 1:1 mapping with the original Spark plan. Each SparkPlan can reference multiple native plans that should be used for metrics collection.

How are these changes tested?

Existing tests, and new unit tests in the planner

@andygrove
Copy link
Member Author

@viirya @parthchandra @mbutrovich This is still WIP but let me know what you think of the overall approach here if you have time.

Current status is that we now log the metrics that we are dropping. Here are two examples from TPC-H q3.

We wrap an aggregate in a projection causing:

Dropping the AggregateExec elapsed_compute time of 1820330 for plan ProjectionExec (#624)

The input to a SortExec is a ScanExec to fetch the input batches from the JVM, and we drop those metrics:

Dropping the ScanExec elapsed_compute time of 1151562087 for plan SortExec (#0)

Comment on lines +969 to +974
Arc::new(SparkPlan::new_with_additional(
spark_plan.plan_id,
projection,
vec![child],
vec![aggregate],
)),
Copy link
Member Author

@andygrove andygrove Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example where we are currently dropping the aggregate metrics and just capturing the projection metrics

@mbutrovich
Copy link
Contributor

My initial thoughts:

@andygrove
Copy link
Member Author

andygrove commented Nov 22, 2024

Some progress!

Before

Screenshot from 2024-11-22 11-44-21

After

Screenshot from 2024-11-22 14-21-34

@andygrove
Copy link
Member Author

We now have metrics for all operators showing time for fetching batches from JVM.

Screenshot from 2024-11-22 15-35-34

@andygrove andygrove changed the title fix: [WIP] Stop dropping metrics fix: Stop dropping metrics and expose CopyExec and ScanExec in Spark SQL Metrics Nov 22, 2024
@andygrove andygrove changed the title fix: Stop dropping metrics and expose CopyExec and ScanExec in Spark SQL Metrics fix: Stop dropping metrics Nov 22, 2024
@andygrove andygrove requested review from viirya and comphead November 22, 2024 22:53
@parthchandra
Copy link
Contributor

Approach looks good (though I cannot say I understand it completely). The results are definitely what we wanted!

@andygrove
Copy link
Member Author

I can possibly break this down into some smaller PRs as well. I may do that.

@@ -365,28 +378,23 @@ struct ScanStream<'a> {
scan: ScanExec,
/// Schema representing the data
schema: SchemaRef,
/// Metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it dropped because it repeats what we have on SparkPlan?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reverted some of these changes now

partition: usize,
baseline_metrics: BaselineMetrics,
) -> Self {
pub fn new(scan: ScanExec, schema: SchemaRef, partition: usize, jvm_fetch_time: Time) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps jvm_fetch_time enough for now, but if you wanna expand metrics in future its better to have a wrapper structure similar to BaselineMetrics ?

@andygrove andygrove changed the title fix: Stop dropping metrics fix: Various metrics bug fixes and improvements Nov 24, 2024
@codecov-commenter
Copy link

codecov-commenter commented Nov 24, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 34.32%. Comparing base (b74bfe4) to head (c77b144).
Report is 13 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1111      +/-   ##
============================================
- Coverage     34.33%   34.32%   -0.02%     
- Complexity      898      899       +1     
============================================
  Files           115      115              
  Lines         42986    43495     +509     
  Branches       9369     9494     +125     
============================================
+ Hits          14761    14930     +169     
- Misses        25361    25658     +297     
- Partials       2864     2907      +43     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@andygrove andygrove marked this pull request as draft November 29, 2024 18:31
@andygrove andygrove marked this pull request as ready for review November 30, 2024 15:27
@andygrove
Copy link
Member Author

I moved the FFI metrics changes out of this PR and into #1128

@andygrove
Copy link
Member Author

@comphead @viirya @parthchandra @mbutrovich this is ready for review now

let projection =
swap_hash_join(hash_join.as_ref(), PartitionMode::Partitioned)?;
let swapped_hash_join = Arc::clone(projection.children()[0]);
println!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is println! needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed, thanks

/// Spark plan ID which is passed down in the protobuf
pub(crate) plan_id: u32,
/// The root native plan that was generated for this Spark plan
pub(crate) native_plan: Arc<dyn ExecutionPlan>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so native_plan it is a Datafusion physical plan?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

#[derive(Debug, Clone)]
pub(crate) struct SparkPlan {
/// Spark plan ID which is passed down in the protobuf
pub(crate) plan_id: u32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the plan_id used somehow?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or additional/child plans searches for the parent by the id?

Copy link
Member Author

@andygrove andygrove Dec 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was not being used yet, but I have now pushed a commit to include it in the "native explain" output, to make it easier to debug performance issues

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm thanks @andygrove
Probably as you do use the plan_id we should document how it is used.

Copy link
Contributor

@parthchandra parthchandra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@andygrove
Copy link
Member Author

Thanks for the reviews @parthchandra and @comphead. I am going to go ahead and merge but will be following up with some more changes since I just learned more and will need to refine these metrics to make them more useful.

@andygrove andygrove merged commit ebdde77 into apache:main Dec 2, 2024
74 checks passed
@andygrove andygrove deleted the stop-dropping-metrics branch December 2, 2024 18:26
andygrove added a commit that referenced this pull request Dec 20, 2024
* feat: support array_append (#1072)

* feat: support array_append

* formatted code

* rewrite array_append plan to match spark behaviour and fixed bug in QueryPlan serde

* remove unwrap

* Fix for Spark 3.3

* refactor array_append binary expression serde code

* Disabled array_append test for spark 4.0+

* chore: Simplify CometShuffleMemoryAllocator to use Spark unified memory allocator (#1063)

* docs: Update benchmarking.md (#1085)

* feat: Require offHeap memory to be enabled (always use unified memory) (#1062)

* Require offHeap memory

* remove unused import

* use off heap memory in stability tests

* reorder imports

* test: Restore one test in CometExecSuite by adding COMET_SHUFFLE_MODE config (#1087)

* Add changelog for 0.4.0 (#1089)

* chore: Prepare for 0.5.0 development (#1090)

* Update version number for build

* update docs

* build: Skip installation of spark-integration  and fuzz testing modules (#1091)

* Add hint for finding the GPG key to use when publishing to maven (#1093)

* docs: Update documentation for 0.4.0 release (#1096)

* update TPC-H results

* update Maven links

* update benchmarking guide and add TPC-DS results

* include q72

* fix: Unsigned type related bugs (#1095)

## Which issue does this PR close?

Closes #1067

## Rationale for this change

Bug fix. A few expressions were failing some unsigned type related tests

## What changes are included in this PR?

 - For `u8`/`u16`, switched to use `generate_cast_to_signed!` in order to copy full i16/i32 width instead of padding zeros in the higher bits
 - `u64` becomes `Decimal(20, 0)` but there was a bug in `round()`  (`>` vs `>=`)

## How are these changes tested?

Put back tests for unsigned types

* chore: Include first ScanExec batch in metrics (#1105)

* include first batch in ScanExec metrics

* record row count metric

* fix regression

* chore: Improve CometScan metrics (#1100)

* Add native metrics for plan creation

* make messages consistent

* Include get_next_batch cost in metrics

* formatting

* fix double count of rows

* chore: Add custom metric for native shuffle fetching batches from JVM (#1108)

* feat: support array_insert (#1073)

* Part of the implementation of array_insert

* Missing methods

* Working version

* Reformat code

* Fix code-style

* Add comments about spark's implementation.

* Implement negative indices

+ fix tests for spark < 3.4

* Fix code-style

* Fix scalastyle

* Fix tests for spark < 3.4

* Fixes & tests

- added test for the negative index
- added test for the legacy spark mode

* Use assume(isSpark34Plus) in tests

* Test else-branch & improve coverage

* Update native/spark-expr/src/list.rs

Co-authored-by: Andy Grove <agrove@apache.org>

* Fix fallback test

In one case there is a zero in index and test fails due to spark error

* Adjust the behaviour for the NULL case to Spark

* Move the logic of type checking to the method

* Fix code-style

---------

Co-authored-by: Andy Grove <agrove@apache.org>

* feat: enable decimal to decimal cast of different precision and scale (#1086)

* enable decimal to decimal cast of different precision and scale

* add more test cases for negative scale and higher precision

* add check for compatibility for decimal to decimal

* fix code style

* Update spark/src/main/scala/org/apache/comet/expressions/CometCast.scala

Co-authored-by: Andy Grove <agrove@apache.org>

* fix the nit in comment

---------

Co-authored-by: himadripal <hpal@apple.com>
Co-authored-by: Andy Grove <agrove@apache.org>

* docs: fix readme FGPA/FPGA typo (#1117)

* fix: Use RDD partition index (#1112)

* fix: Use RDD partition index

* fix

* fix

* fix

* fix: Various metrics bug fixes and improvements (#1111)

* fix: Don't create CometScanExec for subclasses of ParquetFileFormat (#1129)

* Use exact class comparison for parquet scan

* Add test

* Add comment

* fix: Fix metrics regressions (#1132)

* fix metrics issues

* clippy

* update tests

* docs: Add more technical detail and new diagram to Comet plugin overview (#1119)

* Add more technical detail and new diagram to Comet plugin overview

* update diagram

* add info on Arrow IPC

* update diagram

* update diagram

* update docs

* address feedback

* Stop passing Java config map into native createPlan (#1101)

* feat: Improve ScanExec native metrics (#1133)

* save

* remove shuffle jvm metric and update tuning guide

* docs

* add source for all ScanExecs

* address feedback

* address feedback

* chore: Remove unused StringView struct (#1143)

* Remove unused StringView struct

* remove more dead code

* docs: Add some documentation explaining how shuffle works (#1148)

* add some notes on shuffle

* reads

* improve docs

* test: enable more Spark 4.0 tests (#1145)

## Which issue does this PR close?

Part of #372 and #551

## Rationale for this change

To be ready for Spark 4.0

## What changes are included in this PR?

This PR enables more Spark 4.0 tests that were fixed by recent changes

## How are these changes tested?

tests enabled

* chore: Refactor cast to use SparkCastOptions param (#1146)

* Refactor cast to use SparkCastOptions param

* update tests

* update benches

* update benches

* update benches

* Enable more scenarios in CometExecBenchmark. (#1151)

* chore: Move more expressions from core crate to spark-expr crate (#1152)

* move aggregate expressions to spark-expr crate

* move more expressions

* move benchmark

* normalize_nan

* bitwise not

* comet scalar funcs

* update bench imports

* remove dead code (#1155)

* fix: Spark 4.0-preview1 SPARK-47120 (#1156)

## Which issue does this PR close?

Part of #372 and #551

## Rationale for this change

To be ready for Spark 4.0

## What changes are included in this PR?

This PR fixes the new test SPARK-47120 added in Spark 4.0

## How are these changes tested?

tests enabled

* chore: Move string kernels and expressions to spark-expr crate (#1164)

* Move string kernels and expressions to spark-expr crate

* remove unused hash kernel

* remove unused dependencies

* chore: Move remaining expressions to spark-expr crate + some minor refactoring (#1165)

* move CheckOverflow to spark-expr crate

* move NegativeExpr to spark-expr crate

* move UnboundColumn to spark-expr crate

* move ExpandExec from execution::datafusion::operators to execution::operators

* refactoring to remove datafusion subpackage

* update imports in benches

* fix

* fix

* chore: Add ignored tests for reading complex types from Parquet (#1167)

* Add ignored tests for reading structs from Parquet

* add basic map test

* add tests for Map and Array

* feat: Add Spark-compatible implementation of SchemaAdapterFactory (#1169)

* Add Spark-compatible SchemaAdapterFactory implementation

* remove prototype code

* fix

* refactor

* implement more cast logic

* implement more cast logic

* add basic test

* improve test

* cleanup

* fmt

* add support for casting unsigned int to signed int

* clippy

* address feedback

* fix test

* fix: Document enabling comet explain plan usage in Spark (4.0) (#1176)

* test: enabling Spark tests with offHeap requirement (#1177)

## Which issue does this PR close?

## Rationale for this change

After #1062 We have not running Spark tests for native execution

## What changes are included in this PR?

Removed the off heap requirement for testing

## How are these changes tested?

Bringing back Spark tests for native execution

* feat: Improve shuffle metrics (second attempt) (#1175)

* improve shuffle metrics

* docs

* more metrics

* refactor

* address feedback

* Fix redundancy in Cargo.lock.

* Format, more post-merge cleanup.

* Compiles

* Compiles

* Remove empty file.

* Attempt to fix JNI issue and native test build issues.

* Test Fix

* Update planner.rs

Remove println from test.

---------

Co-authored-by: NoeB <noe.brehm@bluewin.ch>
Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Raz Luvaton <raz.luvaton@flarion.io>
Co-authored-by: Andy Grove <agrove@apache.org>
Co-authored-by: Parth Chandra <parthc@apache.org>
Co-authored-by: KAZUYUKI TANIMURA <ktanimura@apple.com>
Co-authored-by: Sem <ssinchenko@apache.org>
Co-authored-by: Himadri Pal <mehimu@gmail.com>
Co-authored-by: himadripal <hpal@apple.com>
Co-authored-by: gstvg <28798827+gstvg@users.noreply.github.com>
Co-authored-by: Adam Binford <adamq43@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants