Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Combine small chunks in sinks for streaming pipelines #14346

Merged

Conversation

itamarst
Copy link
Contributor

@itamarst itamarst commented Feb 7, 2024

Fixes #11699

Small chunks add significant overhead. This PR merges them in the sink. If there are lots of small chunks, this should make things faster. If there are lots of large chunks, it adds a little overhead but it's fixed and small per chunk, so that's fine. There may however be a slow down in some midway edge cases where the chunk merging involves copying extra data.

This doesn't fix the fact that small chunks are being generated, though... I will file a separate PR for the specific edge case in the reproducer script, and file an issue for another likely case if I can reproduce it.

Benchmark

Before:

$ python ../11699.py 
SINK 2.8767316341400146
COLLECT+WRITE 0.9260973930358887
STREAMING COLLECT+WRITE 4.061856031417847

After:

$ python ../11699.py
SINK 1.064814805984497
COLLECT+WRITE 0.9025135040283203
STREAMING COLLECT+WRITE 1.1788344383239746

Using this script:

from datetime import datetime

import polars as pl

start_datetime = datetime(2020, 1, 1)
end_datetime = datetime(
    2024,
    1,
    1,
)
N_ids = 10
interval = "1m"
df = (
    pl.LazyFrame(
        {
            "time": pl.datetime_range(
                start_datetime, end_datetime, interval=interval, eager=True
            )
        }
    )
    .join(
        pl.LazyFrame(
            {"id": [f"id{i:05d}" for i in range(N_ids)], "value": list(range(N_ids))}
        ),
        how="cross",
    )
    .select("time", "id", "value")
)

# print(df.profile(streaming=True))
from time import time

start = time()
df.sink_parquet("/tmp/out.parquet")
print("SINK", time() - start)

start = time()
df.collect().write_parquet("/tmp/out2.parquet")
print("COLLECT+WRITE", time() - start)

start = time()
df.collect(streaming=True).write_parquet("/tmp/out3.parquet")
print("STREAMING COLLECT+WRITE", time() - start)

@github-actions github-actions bot added performance Performance issues or improvements python Related to Python Polars rust Related to Rust Polars labels Feb 7, 2024
@itamarst itamarst changed the title perf: Combine small chunks in sinks for streaming pipelines (#11699) perf: Combine small chunks in sinks for streaming pipelines Feb 7, 2024
@itamarst
Copy link
Contributor Author

itamarst commented Feb 7, 2024

Updates:

  1. The failing test is plausibly not this PR's fault?
  2. I tried to see if filtering could result in empirically measurable slowness. My initial attempt failed to show any effects, so I won't be filing a follow-up.

Copy link
Member

@ritchie46 ritchie46 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice improvement. I have left some comments.

crates/polars-pipe/Cargo.toml Outdated Show resolved Hide resolved
crates/polars-pipe/src/operators/chunks.rs Outdated Show resolved Hide resolved
crates/polars-pipe/src/operators/chunks.rs Outdated Show resolved Hide resolved
crates/polars-pipe/src/operators/chunks.rs Outdated Show resolved Hide resolved
@itamarst
Copy link
Contributor Author

@ritchie46 OK hopefully I've finally understood you, sorry it took so long. If I didn't, please feel free to finish this PR.

Final numbers:

SINK 1.0646779537200928
COLLECT+WRITE 0.9343609809875488
STREAMING COLLECT+WRITE 1.083845853805542

I.e. the collect(streaming=True).write_parquet() case is slightly faster due to the as_single_chunk_par() change.

@@ -57,32 +55,7 @@ impl Sink for OrderedSink {
self.sort();

let chunks = std::mem::take(&mut self.chunks);
let mut combiner = StreamingVstacker::default();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we would rechunk, we could simply rechunk here. But I don't want to do that as that should be left to the consumer of the streaming engine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case it should probably be done in write_parquet(), otherwise the collect(streaming=True).write_parquet() case will continue to be slow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which would requite the new struct be e.g. moved into the polars-core crate and made public.

Here's the runtime with latest commit on my computer, last case is slow again:

SINK 1.0721302032470703
COLLECT+WRITE 0.9235994815826416
STREAMING COLLECT+WRITE 4.254129648208618

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But maybe also write_feather(). or the cloud parquet writer. etc.. (Having it in OrderedSink seemed like a low-cost smoothing of performance bumps, limited to a single place.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then that logic should be in write_parquet indeed. That writer should check the chunk sizes.

I will first merge this one, and then we can follow up with the write_parquet optimization.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But maybe also write_feather(). or the cloud parquet writer. etc.. (Having it in OrderedSink seemed like a low-cost smoothing of performance bumps, limited to a single place.)

Yes, but it is more expensive for other operations. Operations themselves should knie their best chunking strategies.

@ritchie46
Copy link
Member

Thanks for this @itamarst. I think this can be used by more writers. Are you interested in the follow up PR?

@ritchie46 ritchie46 merged commit 921ddea into pola-rs:main Feb 13, 2024
17 checks passed
@itamarst itamarst deleted the 11699-combine-small-chunks-when-writing branch February 13, 2024 23:49
@itamarst
Copy link
Contributor Author

Thanks for all the feedback and help, and yes, I'm happy to do follow-up PR for write_parquet().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance Performance issues or improvements python Related to Python Polars rust Related to Rust Polars
Projects
None yet
Development

Successfully merging this pull request may close these issues.

sink_parquet(...) much slower than .collect().write_parquet(...)
3 participants