Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable retries on connection reset by peer when doing scan_parquet against an object store #14384

Closed
kszlim opened this issue Feb 9, 2024 · 25 comments
Labels
enhancement New feature or an improvement of an existing feature

Comments

@kszlim
Copy link
Contributor

kszlim commented Feb 9, 2024

Description

When querying a large s3 dataset with over 10000 files of size ~300MB, scan parquet will fail with high probability without any retry because of connection reset by peer, this is true even though I'm querying an s3 bucket in the same availability zone from an ec2 instance.

I get:

Generic S3 error: Error after 0 retries in 221.182308ms, max_retries:10, retry_timeout:10s, source:error sending request for url (https://s3.us-west-2.amazonaws.com/some_bucket/some_parquet_id.parquet): connection error: Connection reset by peer (os error 104)

This is fine when querying less files, but with many files this almost invariably happens.

A strategy mentioned in discord by @alexander-beedie that would be useful to implement and is basically absolutely necessary for polars to scale to very large cloud datasets could be:

This is an interesting one, and something I had to account for back in my days at JPM - I had written a multithreaded load method for our in-house dataframe (also mine 😅) from potentially thousands of files on a remote web server. given that a large risk run might generate as many as 20,000 files the chances that every one of them would load successfully every time was *very* low. i was writing C++ back then, but the solution (taking advantage of libcurl) was to have the downloaders draw from the pool of files, tracking two failure conditions - the total number of failures of the pool as a whole, and the number of failures of specific files. if a single file failed, you would increment the total failure count of both the pool and the file by one. each file was allowed to fail three times (each time being put back in the download pool) before the download as a whole would be considered failed, and the pool was allowed to register up to 50 total failures. this kept things very solid - only genuine/persistent network issues would tend to trigger an overall failure - transient failures would just be absorbed/handled transparently...
@kszlim kszlim added the enhancement New feature or an improvement of an existing feature label Feb 9, 2024
@kszlim
Copy link
Contributor Author

kszlim commented Feb 10, 2024

Just fyi, i've opened an upstream issue in object store, i'm not sure whether it should be fixed here at the caller level or in the upstream library, i'm for now leaning towards it being something that should be made more robust at the object_store level, though I believe the strategy suggested by alexander-beedie should still be implemented for robustness in polars execution overall.

@ritchie46
Copy link
Member

Have you tried setting the retry to non-zero?

@kszlim
Copy link
Contributor Author

kszlim commented Feb 10, 2024

Yeah, I set max retries to 10, you can see it in the failure message, it just doesn't retry on this failure type.

@ritchie46
Copy link
Member

Is there a repro on a public dataset that I could use to trigger this? I want to fix this.

@kszlim
Copy link
Contributor Author

kszlim commented Feb 11, 2024

I don't have one unfortunately, though you could likely reproduce it by creating ~10000 parquet files of of roughly 200 columns of floats of roughly 300MB each and then just doing some simple queries against them from within the same AWS AZ.

@ritchie46
Copy link
Member

ritchie46 commented Feb 11, 2024

What is your query? Eager/ streaming/ projection pushdown. Please, help me a bit. As the issue description states, try to make a code snippet that reproduces it.

@kszlim
Copy link
Contributor Author

kszlim commented Feb 11, 2024

What is your query? Eager/ streaming/ projection pushdown. Please, help me a bit. As the issue description states, try to make a code snippet that reproduces it.

It's just a select on several physical parquet columns, there's no extra computation or filtering.

Happens in both streaming and the default engine. It's not reproducible without putting a large dataset in s3 which I unfortunately cannot share.

@kszlim
Copy link
Contributor Author

kszlim commented Feb 15, 2024

@ritchie46 here's the repro case you wanted:

import numpy as np
import polars as pl
import time

NUM_SAMPLES = 400000
NUM_COLS = 100
ROW_SIZE = 100000
NUM_FILES = 10000
df_data = {}

col_prefix = "test_col"
for col_num in range(NUM_COLS):
    df_data[f"{col_prefix}-{col_num}"] = np.random.random(NUM_SAMPLES)

for file_num in range(NUM_FILES):
    start = time.time()

    start = time.time()
    pl.DataFrame(df_data).with_columns(id = pl.lit(file_num)).write_parquet(f"{file_num}.parquet", row_group_size=50000, compression="snappy", statistics=True)
    print(f"Dumped: {file_num}.parquet to disk")

################# Query
PATH_TO_FILES = "s3://YOUR_BUCKET/prefix/*.parquet"
ldf = pl.scan_parquet(PATH_TO_FILES)
ldf.select(pl.col(f"{col_prefix}-0").mean()).collect()

Tested it with polars 0.20.8 and it still triggers for me too (i'm running on a c6a.24xlarge within the same s3 region with the default polars settings/config, I suspect a larger node might even reproduce it more reliably .

@kszlim
Copy link
Contributor Author

kszlim commented Feb 15, 2024

Interestingly the error msg isn't always consistent too, got it failing with:
polars.exceptions.ComputeError: Generic S3 error: request or response body error: error reading a body from connection: end of file before message length reached sometimes (though i manually verified when that file is downloaded locally that there were no issues with it).

@ritchie46
Copy link
Member

Thanks. I have some vacation days, but I hope to get to this.

@kszlim
Copy link
Contributor Author

kszlim commented Feb 15, 2024

Also interestingly I get this:
polars.exceptions.ComputeError: Generic S3 error: request or response body error: error reading a body from connection: Connection reset by peer (os error 104) error more often when I run it against the normal s3 buckets, the error above occurs more often when I run against the express zone one bucket.

Thanks @ritchie46 please enjoy your vacation

Kept running it more and then I get:
polars.exceptions.ComputeError: Generic S3 error: Error after 0 retries in 222.175µs, max_retries:10, retry_timeout:10s, source:error sending request for url (https://s3.us-west-2.amazonaws.com/bucket/prefix/data.parquet): connection error: Connection reset by peer (os error 104)

This suggests to me that the error isn't always in the code path with the retry mechanism (something I observed while running my fork of object store with the patch that I made is that the error above (the last one i just posted) doesn't occur anymore, but I still get connection reset by peer (but without any "Error after 0 retries ..."). This suggests that not everything is being appropriately guarded by the retry (even though these errors aren't being retried with the current retry behavior).

Curious if https://github.com/apache/arrow-rs/blob/master/object_store/src/aws/client.rs#L600 might be a potential culprit for the cases that aren't retry guarded... @tustvold? Because everything else that might make a get request that I see within object store seems to be guarded by retries.

Update:
Very interestingly setting:

        os.environ["POLARS_PREFETCH_SIZE"] = "4096"
        os.environ["POLARS_CONCURRENCY_BUDGET"] = "2000"

makes the queries both more reliable and much faster (on the io side, I get 10-20x more network throughput with a ridiculously high concurrency budget)

@tustvold
Copy link

tustvold commented Feb 15, 2024

We don't retry once response streaming has started, this could be added but would be a relatively non-trivial piece of logic.

As it stands this appears to be an issue specific to polars, and in particular the way it performs IO. The fact setting a prefetch helps, makes me even more confident you are flooding the network. Given this, the fact adding retries doesn't resolve the error, and the creator of hyper advised against this, I'm very lukewarm on making changes to the retry logic in object_store.

My recommendation would still be to try using LimitStore, and failing that use network analysis tools like VPC flow logs or tcpdump to get a sense of what is going on.

I would also be interested if datafusion runs into the same issues, as this might be a useful datapoint.

I am on holiday for the next 2-3 weeks, but will try to respond sporadically

@kszlim
Copy link
Contributor Author

kszlim commented Feb 15, 2024

Also noticed that this can occur even when the current network throughput goes near zero (on the orders of ~5 KB/s), implying that maybe it's actually got something to do with reading the body of these http requests that we're not completing?

When going through pyarrow's dataset, i don't have any network issues (but it's dramatically slower ~20MB/s vs a peak of 1.5-2GB/s).

@ritchie46
Copy link
Member

@tustvold currently Polars has a single object store per file. That's why we have the global semaphore logic. Is it possible to load a a dataset from a single object-store? Then we could use LimitStore.

Though we still need a semaphore to throttle multiple queries running a the same time. To my understanding the Limitstore, should have a similar effect as the global semaphore budget.

@tustvold
Copy link

tustvold commented Feb 16, 2024

It is expected that you would create an ObjectStore once and use it multiple times, this allows for things like connection pooling to work correctly. DataFusion has an ObjectStoreRegistry for this purpose. Creating a store per object is definitely sub-optimal

@ritchie46
Copy link
Member

Alright, so that's possible. I shall take a look then. 👀

@kszlim
Copy link
Contributor Author

kszlim commented Mar 14, 2024

On 0.20.15 i still intermittently run into:

polars.exceptions.ComputeError: Generic S3 error: Error after 0 retries in 13.556699233s, max_retries:10, retry_timeout:10s, source:error sending request for url (https://s3.us-west-2.amazonaws.com/path/to/file.parquet): error trying to connect: operation timed out

It's rare, but occurs when querying my large dataset.

@tustvold
Copy link

tustvold commented Mar 14, 2024

That looks to be a different error. This could point to a couple of things, from some network throttling or futures not being polled in a timely manner. I'm not familiar with how polars has opted to hook up tokio, but it is important to keep CPU-bound tasks such as decoding parquet off the threadpool used for performing IO.

There is some further context on apache/arrow-rs#5366

@ritchie46
Copy link
Member

Hmm.. that's an interesting observation @tustvold. The decoding is indeed in an async. I will see if I can push that on the rayon thread pool and await the result on a queue. @kszlim here we made this change. Can you give it a spin? #15083

@kszlim
Copy link
Contributor Author

kszlim commented Mar 15, 2024

I'll give it a try, probably won't get around to it until Monday though.

@kszlim
Copy link
Contributor Author

kszlim commented Mar 18, 2024

Gave it a go, hard to know if it solves my issue (seems to work), but the issue was pretty rare in the first place. Nice side effect is that it seems to improve core utilization a little and my performance on a simple query:

select 3 columns (of 200) out of 10k files with filter on id == 1000 where each file has a unique id, the code seemed to have sped up about 15% (my query goes from 83s to 70s consistently when running against s3).

I believe this could be improved even further, as having a oneshot channel where the parallelization is only across a few RGs probably has more overhead as opposed to having a MPSC channel where all the readers send all their bytes chunks to a single thread which then reads over all of them in a rayon pool would probably get better utilization, but i expect this probably requires a fairly large change to the existing code.

@kszlim
Copy link
Contributor Author

kszlim commented Mar 18, 2024

Would it also make sense doing the same thing for the metadata deserialization?

https://github.com/pola-rs/polars/blob/main/crates/polars-io/src/parquet/async_impl.rs#L164C39-L164C59

Ie. send all the metadata bytes into a separate thread through a mpsc channel and then do all the decoding in parallel (instead of in tokio tasks)?

@ritchie46
Copy link
Member

I believe this could be improved even further, as having a oneshot channel where the parallelization is only across a few RGs probably has more overhead as opposed to having a MPSC channel where all the readers send all their bytes chunks to a single thread

The row groups are already downloaded in an mpsc queue. The oneshot channel allows us toawait that task and then other tasks can run. I think it should work as intended now.

Yes, we could do that for the metadata as well, but I don't expect the metadata deserialization to block long.

In any case, great to hear about the improvements! :)

@kszlim
Copy link
Contributor Author

kszlim commented Mar 18, 2024

The row groups are already downloaded in an mpsc queue. The oneshot channel allows us toawait that task and then other tasks can run. I think it should work as intended now.

Ah, i mean stuff gets downloaded async in tokio tasks (each holding a MPSC sender), byte slices get sent to a single thread through those senders, that thread then runs the slices through a rayon pool to decode.

I just suspect that parallelism isn't being maximally utilized, as the simple query i'm doing, select 3/200 columns + filter by row, doesn't seem to saturate my CPU (even when I set prefetch size os.environ["POLARS_PREFETCH_SIZE"] = "12000".

The query looks something like this:

`ldf.select(pl.col("col1"), pl.col("col2"), pl.col("col3")).filter(id=1000).collect()` and there's one id per file.

This results in me spending maybe 30s doing network io, and then 40 seconds doing decoding (without pinning all my cores to 100%).

@kszlim
Copy link
Contributor Author

kszlim commented Apr 1, 2024

Going to close this as it seems like it's fixed for me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature
Projects
None yet
Development

No branches or pull requests

3 participants