Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aiohttp.client_exceptions.ServerDisconnectedError does not seem to be handled correctly #537

Closed
orf opened this issue Oct 11, 2021 · 25 comments

Comments

@orf
Copy link
Contributor

orf commented Oct 11, 2021

What happened:

A user reports that running:

s3 = s3fs.S3FileSystem(anon=False)
s3.get("s3://bucket/prefix", "/tmp/some-dir/", recursive=True)

On a prefix with ~120,000 files (~4gb) results in a aiohttp.client_exceptions.ServerDisconnectedError.

What you expected to happen:

The copy should succeed

Minimal Complete Verifiable Example:

s3 = s3fs.S3FileSystem(anon=False)
content = b"h" * (1024 * 1024 * 20)  # 20mb chunks
s3.pipe("s3://bucket/prefix/initial", value=content)

for i in range(120_000):
    s3.cp("s3://bucket/prefix/0", f""s3://bucket/prefix/{i}")

Then:

s3 = s3fs.S3FileSystem(anon=False)
s3.get("s3://bucket/prefix/", "/tmp/some-dir/", recursive=True)

Traceback:

  File "/Users/tom/PycharmProjects/gitlab/x/x/flows/test.py", line 16, in copy_artifact
    artifact.s3fs_client.get(
  File "/home/app/.cache/pypoetry/virtualenvs/x-QbZPvmwz-py3.9/lib/python3.9/site-packages/fsspec/asyn.py", line 91, in wrapper
  File "/home/app/.cache/pypoetry/virtualenvs/x-QbZPvmwz-py3.9/lib/python3.9/site-packages/fsspec/asyn.py", line 71, in sync
  File "/home/app/.cache/pypoetry/virtualenvs/x-QbZPvmwz-py3.9/lib/python3.9/site-packages/fsspec/asyn.py", line 25, in _runner
  File "/home/app/.cache/pypoetry/virtualenvs/x-QbZPvmwz-py3.9/lib/python3.9/site-packages/fsspec/asyn.py", line 459, in _get
  File "/home/app/.cache/pypoetry/virtualenvs/x-QbZPvmwz-py3.9/lib/python3.9/site-packages/fsspec/asyn.py", line 213, in _run_coros_in_chunks
  File "/usr/local/lib/python3.9/asyncio/tasks.py", line 614, in _wait_for_one
  File "/home/app/.cache/pypoetry/virtualenvs/x-QbZPvmwz-py3.9/lib/python3.9/site-packages/s3fs/core.py", line 976, in _get_file
  File "/home/app/.cache/pypoetry/virtualenvs/x-QbZPvmwz-py3.9/lib/python3.9/site-packages/s3fs/core.py", line 268, in _call_s3
  File "/home/app/.cache/pypoetry/virtualenvs/x-QbZPvmwz-py3.9/lib/python3.9/site-packages/s3fs/core.py", line 248, in _call_s3
  File "/home/app/.cache/pypoetry/virtualenvs/x-QbZPvmwz-py3.9/lib/python3.9/site-packages/aiobotocore/client.py", line 141, in _make_api_call
  File "/home/app/.cache/pypoetry/virtualenvs/x-QbZPvmwz-py3.9/lib/python3.9/site-packages/aiobotocore/client.py", line 161, in _make_request
  File "/home/app/.cache/pypoetry/virtualenvs/x-QbZPvmwz-py3.9/lib/python3.9/site-packages/aiobotocore/endpoint.py", line 102, in _send_request
  File "/home/app/.cache/pypoetry/virtualenvs/x-QbZPvmwz-py3.9/lib/python3.9/site-packages/aiobotocore/endpoint.py", line 148, in _do_get_response
  File "/home/app/.cache/pypoetry/virtualenvs/x-QbZPvmwz-py3.9/lib/python3.9/site-packages/aiobotocore/endpoint.py", line 230, in _send
  File "/home/app/.cache/pypoetry/virtualenvs/x-QbZPvmwz-py3.9/lib/python3.9/site-packages/aiobotocore/httpsession.py", line 155, in send
  File "/home/app/.cache/pypoetry/virtualenvs/x-QbZPvmwz-py3.9/lib/python3.9/site-packages/aiohttp/client.py", line 544, in _request
  File "/home/app/.cache/pypoetry/virtualenvs/x-QbZPvmwz-py3.9/lib/python3.9/site-packages/aiohttp/client_reqrep.py", line 890, in start
  File "/home/app/.cache/pypoetry/virtualenvs/x-QbZPvmwz-py3.9/lib/python3.9/site-packages/aiohttp/streams.py", line 604, in read
aiohttp.client_exceptions.ServerDisconnectedError: Server disconnected

Anything else we need to know?:

Possibly related to aio-libs/aiohttp#4549?

Environment:

  • Dask version: Latest
  • Python version: 3.9
  • Operating System: Linux
  • Install method (conda, pip, source): Pip
@martindurant
Copy link
Member

Does this happen with smaller number or size of files, so we have a better chance to investigate?

I can't tell whether the problem happens within the file listing stage or the downloads - do any files appear? If yes, you might try passing batch_size to get to limit the strain on the asyncio event loop.

@orf
Copy link
Contributor Author

orf commented Oct 11, 2021

Ok, I see the issue. The _get_batch_size() method here returns 131072, which is... rather large.

We'll set a default in our library, but I don't think that the fsspec default is something we should rely on. Perhaps we could set up a more sensible upper-limit in the S3FileSystem constructor?

@orf
Copy link
Contributor Author

orf commented Oct 11, 2021

Also it seems that batch_size can't be set in the constructor?

> fs = s3fs.S3FileSystem(anon=False, batch_size=100)
> fs.batch_size  # None
> fs.ls("s3://foo")
...
  File "/Users/tom/Library/Caches/pypoetry/virtualenvs/x/lib/python3.9/site-packages/s3fs/core.py", line 563, in _lsdir
    await self.set_session()
  File "/Users/tom/Library/Caches/pypoetry/virtualenvs/x/lib/python3.9/site-packages/s3fs/core.py", line 382, in set_session
    self.session = aiobotocore.session.AioSession(**self.kwargs)
TypeError: __init__() got an unexpected keyword argument 'batch_size'

It looks like it should be passed if we whitelist it here: https://github.com/dask/s3fs/blob/main/s3fs/core.py#L203

@martindurant
Copy link
Member

@isidentical , I think you'll agree that the above is sensible?

@isidentical
Copy link
Member

The _get_batch_size() method here returns 131072, which is... rather large.

Oh, it is indeed very large. Maybe we should change this to;

    else:
        return soft_limit // 8

something like this

else:
    return max(soft_limit // 8, _DEFAULT_BATCH_SIZE)

128 is a reasonable number, and even if it feels small people can override it since this function only gets called when there is no default specified as an argument and the default is not present in the config.

It looks like it should be passed if we whitelist it here: https://github.com/dask/s3fs/blob/main/s3fs/core.py#L203

Indeed. I don't see why we need such a filter (CC: @martindurant) since on other filesystems I simply pass all the kwargs to the base class, but if we have a reason we should just also add it to there. Thanks for noticing this, and doing the research!

@orf
Copy link
Contributor Author

orf commented Oct 11, 2021

No problem! I honestly think 128 is too small as a default - S3 is pretty performant, we've changed the default to be 5,000 and it's scaling fine.

To put it another way, we're constrained here by the number of tasks a Python event loop can handle rather than the number of requests S3 can accept. 128 is pretty low for both, maybe 1,024 would be a more reasonable default?

@martindurant
Copy link
Member

I have certainly never met any limiting behaviour when making >1000 requests simultaneously. I would tend towards that kind of number.

@isidentical
Copy link
Member

No problem! I honestly think 128 is too small as a default - S3 is pretty performant, we've changed the default to be 5,000 and it's scaling fine.

Interesting. Is it possible to test this with for example, 500 concurrent tasks for example. It would provide a useful data for determining the limit.

To put it another way, we're constrained here by the number of tasks a Python event loop can handle rather than the number of requests S3 can accept.

Unfortunately not. This batching system was initially implemented due to open file limit erorrs. The current way of detection is simply max number of files a process can open divided by 8. Maybe we could do change both the normal calculation and the upper limit. How about min(soft_limit // 2, 768) or something similiar. I'd argue 5000 seems very like a very large number, unless the difference with it compared to 768 or 1024 is big enough.

@orf
Copy link
Contributor Author

orf commented Oct 11, 2021

S3 doesn't really have a request limit. At least not one we can determine. If you've got a cold bucket (1 rps) and you suddenly hit it with 5,000 requests per second it will tell you to back off and begin scaling up. But if you've got a hot bucket (~1,000 rps) you can burst up to ~30k rps without issue.

The best way would be to dynamically adjust the batch size until you start hitting "go away" errors, and adjust from there. But that's going to be pretty complex.

Currently we have a very large upper bound and this issue hasn't been raised before. I think min(soft_limit // 2, 1024) is going to be perfectly acceptable.

@martindurant
Copy link
Member

Can someone please propose that the general throttle limit be ~>1000, but that the limit for actions requiring local files (get, put) be the current value, which should account for open-file limits.

@martindurant
Copy link
Member

@isidentical , actually, I'd like you to do this, if you have some time: _run_coros_in_chunks is only called from _get and _put because of the open files limit. From this discussion, it seems like it would be a good idea to throttle all gather calls, but with a much bigger limit ~>1000. I note that s3fs and gcsfs also call gather directly, and the ones in _rm might actually matter.

@orf , does increasing S3FielSystem.retries help?

@isidentical
Copy link
Member

@isidentical , actually, I'd like you to do this, if you have some time: _run_coros_in_chunks is only called from _get and _put because of the open files limit. From this discussion, it seems like it would be a good idea to throttle all gather calls, but with a much bigger limit ~>1000. I note that s3fs and gcsfs also call gather directly, and the ones in _rm might actually matter.

Then it makes sense to expose this publicly from fsspec.asyn, with a better API (instead of automatically infer the batchsize from the open file limit, just use 1024 statically. And pass batch_size=_get_batch_size() on put/get). I'd probably call it batch_run() with a default of 1024.

@martindurant
Copy link
Member

Agreed. The default could be different for file and non-file cases, and should be configurable (we currently have the "gather_batch_size" key).

@artttt
Copy link

artttt commented Feb 22, 2022

I might be overlooking something in the discussion above but Im not seeing a clear workaround I can apply for this ServerDisconnectedError error. Is there one?
Im occasionally hitting this error when trying to write to zarr from xarray
code im using is along the lines of the following....
Thanks

fs = s3fs.S3FileSystem(
    anon=False,  s3_additional_kwargs={"ACL": "bucket-owner-full-control"}
)
store = s3fs.S3Map(root=s3path, s3=fs, check=False)
ds.to_zarr(store,consolidated=True)

@martindurant
Copy link
Member

@orf , are you thinking that this should be a case that is retried, but currently is not?

@orf
Copy link
Contributor Author

orf commented Feb 22, 2022

Ok, I see the issue. The _get_batch_size() method here returns 131072, which is... rather large.

We'll set a default in our library, but I don't think that the fsspec default is something we should rely on. Perhaps we could set up a more sensible upper-limit in the S3FileSystem constructor?

@martindurant I think this is the issue. Launching 131,072 concurrent tasks, each rushing to send an individual HTTPS request to S3 is a bit optimistic. I would guess that the event-loop stalls and this results in a cascade of ServerDisconnectedError's because of the stalls. This should be capped at a more reasonable level, 1024 was suggested above.

But yes, it probably should be added to the set of retriable exceptions here: https://github.com/fsspec/s3fs/blob/main/s3fs/core.py#L52, but I doubt this will fix the underlying issue?

@martindurant
Copy link
Member

OK, so may I suggest that you make a PR here to add the exception type to those which get retried.

Ok, I see the issue. The _get_batch_size() method here returns 131072, which is... rather large.

Can you figure out how this happened? We have the reasonable upper limit of _NOFILES_DEFAULT_BATCH_SIZE = 1280 set a few lines above.

@artttt
Copy link

artttt commented Feb 23, 2022

If im understanding correctly there are 2 issues at play here:

  1. s3 isnt liking a a large number of concurrent https requests and that there is some work to do to make s3fs make sensible use of batching and retries. Im happy to test these fixes with data that is giving me the error if someone gives some pointers on how to patch this on my dask cluster.

I note gcsfs tackled the retries issue for this error last year fsspec/gcsfs#385

  1. xarray to_zarr is producing a silly amount of http requests. Maybe I should be looking there to avoid this issue in the first place. The data im using is quite sparse so lots of chunks will be only nodata i wonder if this is resulting in a very rapid series of attempts to write chunks. I guess i will go looking in xarray and zarr for a way to avoid triggering this error with sparse data.

@martindurant
Copy link
Member

I don't believe 2. is an issue. zarr does not produce the HTTP calls. fsspec/s3fs does.

I see the following two issues:

  • we do not recognise and retry following this specific type of exception, but should, using the usual exponential backoff scheme
  • somehow, the batch size for async submission is far too big; we should not allow bigger batches than the default 1280 unless explicitly overridden. I would still be interested to know how that happened.

@orf
Copy link
Contributor Author

orf commented Mar 1, 2022

One thing to note here: _get_file doesn't seem to obey S3_RETRYABLE_ERRORS

s3fs/s3fs/core.py

Lines 1019 to 1041 in 6f844d4

async def _get_file(
self, rpath, lpath, callback=_DEFAULT_CALLBACK, version_id=None
):
bucket, key, vers = self.split_path(rpath)
if os.path.isdir(lpath):
return
resp = await self._call_s3(
"get_object",
Bucket=bucket,
Key=key,
**version_id_kw(version_id or vers),
**self.req_kw,
)
body = resp["Body"]
callback.set_size(resp.get("ContentLength", None))
try:
with open(lpath, "wb") as f0:
while True:
chunk = await body.read(2**16)
if not chunk:
break
segment_len = f0.write(chunk)
callback.relative_update(segment_len)

Which means both ServerDisconnectedError and ClientPayloadError are ignored if the server disconnects during await body.read(2**16).

@martindurant
Copy link
Member

Hm I see - _get_file has the same retry logic as everything else on starting the call via _call_s3, but disconnect can happen sometime later while reading the stream. Ideally, we'd like to be able to restart the comm wherever it left off, so this is a little tricky to code. @orf , would you like to try implementing an outer loop that restarts the download should the local file not reach the required size?

@orf
Copy link
Contributor Author

orf commented Mar 2, 2022

Sure! I can whip up a PR for that.

@artttt
Copy link

artttt commented Mar 3, 2022

I thought it might help if I produced an example that generally triggers this error when im using zarr. Obviously a silly example but it seems to mostly fail.

I run this with a 15 worker dask cluster on EC2

import xarray as xr
import s3fs
fs = s3fs.S3FileSystem(
    anon=False,  s3_additional_kwargs={"ACL": "bucket-owner-full-control"}
)
s3path = f's3://.........../testing_rubbish/test1_zarr'
store = s3fs.S3Map(root=s3path, s3=fs, check=False)

da = xr.DataArray(coords=dict(x=range(0), y=range(0)), dims=("x", "y")) 
da = da.chunk({"x": 10000, "y": 10000})
da = da.pad(pad_width={
                'x': (0, 1000000),
                'y': (0, 1000000),},
      )
da = da.chunk({"x": 10000, "y": 10000})
ds = da.to_dataset(name = 'data')
z = ds.to_zarr(store,mode = "w")

@orf
Copy link
Contributor Author

orf commented Mar 7, 2022

I added a fix here: #601

I've also seen FSTimeoutError be thrown, which I also added to the retryable exceptions:

...
  File "x-packages/s3fs/core.py", line 1002, in _get_file
    chunk = await body.read(2 ** 16)
  File "x-packages/aiobotocore/response.py", line 55, in read
    raise AioReadTimeoutError(endpoint_url=self.__wrapped__.url,
aiobotocore.response.AioReadTimeoutError: Read timeout on endpoint URL: "s3 URL here"
...
    self.s3fs_client.get(self.s3_uri, str(directory), recursive=True)
  File "x-packages/fsspec/asyn.py", line 91, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "x-packages/fsspec/asyn.py", line 69, in sync
    raise FSTimeoutError from return_result

@artttt
Copy link

artttt commented Apr 5, 2022

Just wanted to comment that i've been using @orf s fix and it is working well for me.
Thanks for your work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants