Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GenericFileSystem Buffered Copy #1578

Open
ryaminal opened this issue Apr 17, 2024 · 6 comments
Open

GenericFileSystem Buffered Copy #1578

ryaminal opened this issue Apr 17, 2024 · 6 comments

Comments

@ryaminal
Copy link

Hi all. Love fsspec.

I'm trying to use GenericFileSystem like this:

import fsspec
import fsspec.generic

fs = fsspec.url_to_fs("sftp://username@host")[0]

fsspec.generic.rsync(
    "sftp:///stuff", # only the path necessary here. the username and host and stuff is discarded. just the protocol and path are used
    "gs://bucket/dir1/dir2",  # don't add the trailing slash, unless you want double slashes in your path!
    inst_kwargs={"default_method": "current"},
)

This will currently use the fsspec.generic.GenericFileSystem._copy method which creates a temp file on disk(by default) e.g. sftp -> local -> gs. This is undesirable for my use case.

Assumption 1

When looking at GenericFileSystem, there is a buffering implementation in fsspec.generic.GenericFileSystem._cp_file however, I don't think that method will ever be called because _copy has been implemented(unless it is added to _copy).

Force _cp_file to be used

If I remove _copy(rename to __copy) then _cp_file is in fact called, but there is a problem. The error is that open_async is not implemented by either of the filesystems in my example(sshfs.spec.SSHFileSystem and gcsfs.core.GCSFileSystem)

Assumption 2

This is very confusing to me because both SSHFileSystem and GCSFileSystem both extend fsspec.asyn.AsyncFileSystem but neither of them implement open_async. So, in GenericFileSystem._cp_file when the if hasattr(fs, "open_async") checks are done, they return true because they technically have that attribute/method, but it is not implemented.

Force sync open

If a sync open is forced in GenericFileSystem._cp_file then another error

NotImplementedError: Calling sync() from within a running loop

is found because we are in an async context but trying to call a sync method.

Question

What to do here?

  1. Implement my own rsync that doesn't try to be generic? (kind of already did this, heavily based on `GenericFileSystem
  2. figure out how to do non-async in an async context, for now, and incur the performance "penalty"
  3. implement open_async in both SSHFileSystem and GCSFileSystem
@martindurant
Copy link
Member

Indeed, it has proven hard to implement open_async even for filesystems that are fundamentally async. Just because each operation is async doesn't necessarily mean you can leave a connection open in the background. Particularly, there seem to be no good ways to write async to a file (more below). The file caching process seemed like a good intermediate place, where we can stream from multiple files and write to multiple files in batches and not be limited by strict streaming.

Details: aiohttp allows for async writes using a pull pattern where you can provide an async generator. However, our situation is push because we are waiting on reads from another async stream, so the typical await f2.write(await f.read()) doesn't work. I don't know how to get around this.

@ryaminal
Copy link
Author

@martindurant , that context is great, thanks.
at first glance it seems so trivial to implement open_async but then reality hits and it's a pretty tricky problem to try and implement "generically".

curous about the 3 options i listed above.

i think the "fastest" win(selfishly for me) would be to figure out how to get GenericFileSystem._cp_file to work with sync even though gcsfs and sshfs both implement AsyncFileSystem. but i'd love other's thoughts.

@Skylion007
Copy link
Contributor

Can't we just use https://pypi.org/project/aioshutil/ for the aio version of shutil.copyfileobj?

@Skylion007
Copy link
Contributor

@Skylion007
Copy link
Contributor

Not too bad to implement on your own apparently: Tinche/aiofiles#61 (comment)

@martindurant
Copy link
Member

aioshutil would have had the same pull problem I described above. We don't know how to make a generic async write() method without at least repeating all the current buffering logic in the various async files. A true stream (open connection) appears to be not possible.

i think the "fastest" win(selfishly for me) would be to figure out how to get GenericFileSystem._cp_file to work with sync even though gcsfs and sshfs both implement AsyncFileSystem.

It would be OK maybe. We'd we restricted to working on one file at a time, which works for cp_file() but very not for copy().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants