Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache branching subplans optimization prevents parallel concat of LazyFrames #17430

Closed
2 tasks done
seemich opened this issue Jul 4, 2024 · 2 comments · Fixed by #17463
Closed
2 tasks done

cache branching subplans optimization prevents parallel concat of LazyFrames #17430

seemich opened this issue Jul 4, 2024 · 2 comments · Fixed by #17463
Assignees
Labels
accepted Ready for implementation bug Something isn't working needs triage Awaiting prioritization by a maintainer performance Performance issues or improvements python Related to Python Polars

Comments

@seemich
Copy link

seemich commented Jul 4, 2024

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

dfs = [pl.LazyFrame({"a": range(100_000_000)}) for i in range(16)]

df = pl.concat(dfs)

out1 = df.filter(pl.col("a") % 2 == 0).select(pl.col("a").mean())
out2 = df.filter(pl.col("a") % 2 == 0).select(pl.col("a").std())

print(pl.concat([out1, out2]).profile(comm_subplan_elim=True))
(shape: (2, 1)
┌───────────────────┐
│ a                 │
│ ---               │
│ f64               │
╞═══════════════════╡
│ 4.9999999e7       │
│ 28,867,513.477523 │
└───────────────────┘, shape: (19, 3)
┌──────────────────────┬───────────┬───────────┐
│ node                 ┆ start     ┆ end       │
│ ---                  ┆ ---       ┆ ---       │
│ str                  ┆ u64       ┆ u64       │
╞══════════════════════╪═══════════╪═══════════╡
│ optimization         ┆ 0         ┆ 247,465   │
│ simple-projection(a) ┆ 247,465   ┆ 247,466   │
│ simple-projection(a) ┆ 521,158   ┆ 521,160   │
│ simple-projection(a) ┆ 768,187   ┆ 768,188   │
│ simple-projection(a) ┆ 1,040,995 ┆ 1,040,996 │
│ simple-projection(a) ┆ 1,310,942 ┆ 1,310,943 │
│ simple-projection(a) ┆ 1,584,499 ┆ 1,584,500 │
│ simple-projection(a) ┆ 1,858,001 ┆ 1,858,002 │
│ simple-projection(a) ┆ 2,131,008 ┆ 2,131,009 │
│ simple-projection(a) ┆ 2,404,079 ┆ 2,404,080 │
│ simple-projection(a) ┆ 2,679,186 ┆ 2,679,187 │
...
│ simple-projection(a) ┆ 4,293,353 ┆ 4,293,354 │
│ select(a)            ┆ 4,293,379 ┆ 4,726,533 │
│ select(a)            ┆ 4,726,540 ┆ 6,347,674 │
└──────────────────────┴───────────┴───────────┘)

Log output

No response

Issue description

CPU usage is at 100% in this case. The filters on each of the 16 df segments happens in series. If we instead run with comm_subplan_elim=False we get the expected behaviour of 1600% CPU usage, and the overall run time is reduce by about half.

Our actual use case involves concatenated lazyframes from scan_ipc which go through a join_asof. We branch into two related queries before concatenating the result (just like in this example). In that scenario the runtime is reduced substantially (5-8x) by running with comm_subplan_elim=False due to parallelism. However it takes twice as long as the single query takes since the scan_ipc, filtering, and joining has to happen twice.

I don't know if this is a known limitation, but it would be great to have both subplan branch cache working in combination with parallel concat.

For the simplified example presented here reduced to 3 source dataframes instead of 16, here is the query plan without subplan branch cache:

UNION
  PLAN 0:
     SELECT [col("a").sum()] FROM
      UNION
        PLAN 0:
          DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: [([(col("a")) % (2)]) == (0)]
        PLAN 1:
          DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: [([(col("a")) % (2)]) == (0)]
        PLAN 2:
          DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: [([(col("a")) % (2)]) == (0)]
      END UNION
  PLAN 1:
     SELECT [col("a").mean().strict_cast(Int64)] FROM
      UNION
        PLAN 0:
          DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: [([(col("a")) % (2)]) == (0)]
        PLAN 1:
          DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: [([(col("a")) % (2)]) == (0)]
        PLAN 2:
          DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: [([(col("a")) % (2)]) == (0)]
      END UNION
END UNION

And her it is with branch cache:

UNION
  PLAN 0:
     SELECT [col("a").sum()] FROM
      CACHE[id: 0, cache_hits: 1]
        UNION
          PLAN 0:
            simple π 1/1 ["a"]
              DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: [([(col("a")) % (2)]) == (0)]
          PLAN 1:
            simple π 1/1 ["a"]
              DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: [([(col("a")) % (2)]) == (0)]
          PLAN 2:
            simple π 1/1 ["a"]
              DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: [([(col("a")) % (2)]) == (0)]
        END UNION
  PLAN 1:
     SELECT [col("a").mean().strict_cast(Int64)] FROM
      CACHE[id: 0, cache_hits: 1]
        UNION
          PLAN 0:
            simple π 1/1 ["a"]
              DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: [([(col("a")) % (2)]) == (0)]
          PLAN 1:
            simple π 1/1 ["a"]
              DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: [([(col("a")) % (2)]) == (0)]
          PLAN 2:
            simple π 1/1 ["a"]
              DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: [([(col("a")) % (2)]) == (0)]
        END UNION
END UNION

Expected behavior

expect query to run in parallel. Expect to see higher CPU usage and to see overlapping start and end times in .profile()

Installed versions

--------Version info---------
Polars:               1.0.0
Index type:           UInt64
@seemich seemich added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels Jul 4, 2024
@alexander-beedie alexander-beedie added the performance Performance issues or improvements label Jul 5, 2024
@ritchie46 ritchie46 self-assigned this Jul 6, 2024
@seemich
Copy link
Author

seemich commented Jul 6, 2024

Fantastic, thanks! I can confirm that #17463 fixes parallelism for our real-world scenario with scan_ipc sourced dataframes.

@ritchie46
Copy link
Member

Great! :)

Release coming up.

@c-peters c-peters added the accepted Ready for implementation label Jul 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
accepted Ready for implementation bug Something isn't working needs triage Awaiting prioritization by a maintainer performance Performance issues or improvements python Related to Python Polars
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

4 participants