Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Parquet][Python] Variable download speed from threads when reading S3 #38664

Closed
eeroel opened this issue Nov 10, 2023 · 15 comments
Closed

[Parquet][Python] Variable download speed from threads when reading S3 #38664

eeroel opened this issue Nov 10, 2023 · 15 comments

Comments

@eeroel
Copy link
Contributor

eeroel commented Nov 10, 2023

Describe the enhancement requested

Hi,

When reading Parquet concurrently from S3 using Arrow S3fs implementation with FileSystemDataset, I observe that often some threads download data much slower than others. It also seems that the downloads don't get any faster when other threads complete, which I would expect if it was just about network saturation (but networking is not my strong suit). The result of this is that reading a table is often slow and has high variability, and using pre_buffer=True can sometimes hamper performance because individual threads download larger chunks.

Here's an example of cumulative data fetched versus time, using nyc-taxi data, with four equal-sized chunks being downloaded from four threads. Two of the downloads take > 2x the time of the two faster ones. The data is extracted from lines in the S3 debug output with "bytes written".
output

I'm running on Mac OS 14.1, Apple Silicon.

Here's the code that reproduces this issue on my system, and writes the logs:

import time
import pyarrow._s3fs

pyarrow._s3fs.initialize_s3(pyarrow._s3fs.S3LogLevel.Trace)
pyarrow.set_io_thread_count(100)

from pyarrow.dataset import FileSystemDataset, ParquetFileFormat, ParquetFragmentScanOptions

import pyarrow
import pyarrow.fs
import pyarrow.parquet
import pyarrow.dataset as ds

fs = pyarrow.fs.S3FileSystem(region="us-east-2")
fs = pyarrow.fs.SubTreeFileSystem("ursa-labs-taxi-data", fs)

format = ParquetFileFormat(
    default_fragment_scan_options=ParquetFragmentScanOptions(pre_buffer=True),
)

# one fragment is enough to reproduce
fragments = [format.make_fragment(x, filesystem=fs) for x in ["2018/01/data.parquet"]]

# use different file for schema
schema = pyarrow.parquet.read_schema("2019/01/data.parquet", filesystem=fs)
fsd = FileSystemDataset(fragments, schema, format, fs)

t = time.time()
# read all data so the pre-buffered chunks are large
fsd.to_table(
    #columns=["passenger_count"],
    #filter=ds.field('passenger_count') >= 5,
    batch_readahead=100,
    fragment_readahead=1
)
print(time.time()-t)

With that script in perf_testing.py I filter the logs like so:
python perf_testing.py > out.log && cat out.log | grep "bytes written" > out_filtered.log

And here is the code to produce the plot from the filtered log:

import pandas as pd
from matplotlib import pyplot as plt

df = pd.read_csv("out_filtered.log", sep=' ', header=None)
df = df.assign(
    ts=lambda x: pd.to_datetime(x[1]+"T"+x[2]), 
    bytes=lambda x: x[5], thread=lambda x: x[4],
    mb = lambda x: x["bytes"]/1024/1024
)[["ts", "thread", "mb"]]

df = df.sort_values("ts")

df = df.groupby("thread").agg(cum_mb=("mb", "cumsum")).join(df)
fig, ax = plt.subplots(figsize=(8,6))
plt.xlabel("Time")
plt.ylabel("MB")
for thread, group in df.groupby("thread"):
    group.plot(x="ts", y="cum_mb", ax=ax, label=thread, alpha=0.5)
plt.savefig("out.png")

Component(s)

Parquet, Python

@eeroel eeroel changed the title Variable download speed from threads when reading S3 [Parquet][Python] Variable download speed from threads when reading S3 Nov 10, 2023
@mapleFU
Copy link
Member

mapleFU commented Nov 10, 2023

Currently default impl using LazyDefault, would you mind switch back to 13 with preBuffer=True or config the prefetch options to Default rather than LazyDefault?

@eeroel
Copy link
Contributor Author

eeroel commented Nov 10, 2023

Currently default impl using LazyDefault, would you mind switch back to 13 with preBuffer=True or config the prefetch options to Default rather than LazyDefault?

Tried this now, setting (cache_options_(::arrow::io::CacheOptions::Defaults()), in parquet/properties.h, but I don't see a difference in behavior.

@mapleFU
Copy link
Member

mapleFU commented Nov 10, 2023

Sigh, Fragment read code can have multiple pathes, and they're always hacking, including scanner, CacheOptions and Parquet file stats... It's hard to debugging it without local logs...

Maybe we should add more document for that...

@eeroel
Copy link
Contributor Author

eeroel commented Nov 10, 2023

Sigh, Fragment read code can have multiple pathes, and they're always hacking, including scanner, CacheOptions and Parquet file stats... It's hard to debugging it without local logs...

Maybe we should add more document for that...

I get similar results with parquet.read_table, I wonder if that helps narrow it down? EDIT: Nevermind, looks like read_table uses Dataset under the hood
Screenshot 2023-11-10 at 11 57 29

@eeroel
Copy link
Contributor Author

eeroel commented Nov 11, 2023

After some research, based on how the curves look, I think the most likely cause is network congestion control. It's odd to see such dramatic differences between connections, doesn't feel optimal, but I guess this is related to my network quality and not something that can be improved in Arrow?

@eeroel
Copy link
Contributor Author

eeroel commented Nov 11, 2023

For what it's worth, this same .to_table call takes 15 seconds (so throughput is around 8MB/s) on Google Colab, which has 500MB/s download bandwith. I don't have plots from there but I suspect it might have a similar issue with congestion control being too aggressive (if that's what's happening). I wonder if there would be any strategies to optimize this?

@eeroel
Copy link
Contributor Author

eeroel commented Nov 11, 2023

I suspect this might be the issue. All of the connections are made to the same IP address (except the first one, I believe that's from the initial schema call), so the load isn't properly distributed in S3. So there's some DNS caching going on, question is how to disable it? https://docs.aws.amazon.com/whitepapers/latest/s3-optimizing-performance-best-practices/horizontal-scaling-and-request-parallelization-for-high-throughput.html

[DEBUG] 2023-11-11 14:53:16.183 CURL [0x1e5c89ec0] (Text) Connected to ursa-labs-taxi-data.s3.us-east-2.amazonaws.com (52.219.106.242) port 443 (#0)
[DEBUG] 2023-11-11 14:53:17.820 CURL [0x16bb8b000] (Text) Connected to ursa-labs-taxi-data.s3.us-east-2.amazonaws.com (52.219.105.114) port 443 (#0)
[DEBUG] 2023-11-11 14:53:17.827 CURL [0x16bc17000] (Text) Connected to ursa-labs-taxi-data.s3.us-east-2.amazonaws.com (52.219.105.114) port 443 (#0)
[DEBUG] 2023-11-11 14:53:17.827 CURL [0x16baff000] (Text) Connected to ursa-labs-taxi-data.s3.us-east-2.amazonaws.com (52.219.105.114) port 443 (#0)
[DEBUG] 2023-11-11 14:53:17.827 CURL [0x16bca3000] (Text) Connected to ursa-labs-taxi-data.s3.us-east-2.amazonaws.com (52.219.105.114) port 443 (#0)

@eeroel
Copy link
Contributor Author

eeroel commented Nov 12, 2023

Did some benchmarking against AWS CLI (aws s3 cp):

  • AWS CLI also connected to only one IP address so that's probably OK, and not bottlenecking at these rates. I also tested setting https://curl.se/libcurl/c/CURLOPT_DNS_SHUFFLE_ADDRESSES.html but didn't see any performance improvement although connections were made with different IPs.
  • AWS CLI is 15-20% faster on my computer so there could be some room for optimization in the pre-buffer / cache parameters, but it's not a major difference. Interestingly, the CLI downloads the file mostly in 9MB or 18MB chunks, with the 9MB chunks at half the rate compared to the 18MB ones.
  • I mentioned above that Colab took 15s to download the file, but this must have been an instance outside of the US, on another instance I get 2-4s download times (both with AWS CLI and pyarrow 13/14)

@mapleFU
Copy link
Member

mapleFU commented Nov 15, 2023

#38664 (comment)

Some AWS EC env is able to control using export ..., I think maybe try that would helps

@eeroel
Copy link
Contributor Author

eeroel commented Nov 15, 2023

#38664 (comment)

Some AWS EC env is able to control using export ..., I think maybe try that would helps

Hmm, if you mean AWS_EC2_METADATA_DISABLED I think that's related to determining the region? I understood that the DNS resolution to one IP would be something arising from libcurl (as setting the shuffle flag helped)

I think I will just close this issue and open a new one if I have something more concrete...

@eeroel eeroel closed this as completed Nov 15, 2023
@mapleFU
Copy link
Member

mapleFU commented Nov 15, 2023

I mean that maybe some flags in https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html helps, but I didn't test them. S3FS in arrow fs just wraps the S3 SDK, maybe we can find some existing solutions in S3 and port from them?

@eeroel
Copy link
Contributor Author

eeroel commented Nov 15, 2023

I mean that maybe some flags in https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html helps, but I didn't test them. S3FS in arrow fs just wraps the S3 SDK, maybe we can find some existing solutions in S3 and port from them?

Right, I couldn't find any AWS flag that would help here. There is also this issue about switching to the S3CrtClient in AWS SDK: #18923

I actually tried the CRT client with Arrow, it's a drop-in replacement, and in my limited test cases it does give higher throughput than the old S3Client. Not sure what the main sources of performance boost are, but it does handle the IP address discovery in some clever way and automatically splits GET requests into smaller ranges and parallelizes. The main problem I've noticed is that it actually does a HEAD request for each byte-range GET, in order to validate the range, so this would be an issue for low-latency use cases (I've been hoping that HEAD requests could be eliminated 😅).

@mapleFU
Copy link
Member

mapleFU commented Nov 15, 2023

Let's move #37868 forward...

@drjohnrbusch
Copy link

@eeroel We want to performance test arrow with the aws CRT client as you did. You said "I actually tried the CRT client with Arrow, it's a drop-in replacement, and in my limited test cases it does give higher throughput than the old S3Client." Can you provide us a patch or code fragment for Arrow to use the CRT? Thanks!

@eeroel
Copy link
Contributor Author

eeroel commented Mar 25, 2024

@drjohnrbusch Sure! It's here, it's not up to date but I think it should compile: https://github.com/eeroel/arrow/tree/feat/use_crt

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants