Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

should adlfs be responsible for the event loop management for threads? #218

Closed
isidentical opened this issue Apr 16, 2021 · 10 comments
Closed

Comments

@isidentical
Copy link
Member

Here is an example where it fails with "there is no set event loop for thread: blablah"

import threading
from azure.identity.aio import DefaultAzureCredential

from adlfs import AzureBlobFileSystem

def main():
    fs = AzureBlobFileSystem(
        account_name='hey',
        credential=DefaultAzureCredential()
    )
    fs.ls('/')

thread = threading.Thread(target=main)
thread.start()

The cause of this issue is that, the thread doesn't have any event loop set but AzureBlobFileSystem.do_connect() tries to instantiate classes that requires access to set event loops (e.g AIOBlobServiceClient).

This is something that can be easily solved via asyncio.new_event_loop/asyncio.set_event_loop though I am a bit unsure about whether the client should manage this or should do_connect in AzureBlobFileSystem get a special condition for this case and manage event loop by itself for threads that don't have one.

@hayesgb
Copy link
Collaborator

hayesgb commented May 7, 2021

@isidentical -- thanks for bringing this forward. Do you know if it started after #198 ? Tagging @martindurant to find out if this is just an issue with adlfs or if it impacts other packages that use fsspec.

@isidentical
Copy link
Member Author

@isidentical -- thanks for bringing this forward. Do you know if it started after #198 ?

I don't know for sure, but don't think so it is related with that.

@martindurant
Copy link
Member

What version of fsspec are you running against here?

@isidentical
Copy link
Member Author

What version of fsspec are you running against here?

I think it is not related with fsspec, since both __init__ and the do_connect() are sync functions. Though ni do_connect, DefaultCredentials tries to decide which authentication option it is trying to use and for doing that it tries to get the loop to run some code. But it fails since the threads don't set the event loop. I think the easiest path would be just setting the event loop either in the AsyncFileSystem.__init__ (then this becomes an fsspec issue) or AzureBlobStorageFileSystem.__init__ / do_connect.

@hayesgb
Copy link
Collaborator

hayesgb commented May 7, 2021

@martindurant -- It's fsspec == 2021.4.0, and after spending a little more time on it, I agree with @isidentical that the issue seems to be related to the Azure sdk implementation.

For me,, when calling dd.read_csv("az://<filepath>").compute() with dask dataframes in a local environment, creates the issue with async service principal credentials, while dd.read_csv("az://<filepath>").compute(scheduler='single-threaded') works as expected.

With a LocalCluster or a remote cluster, it works as expected. It appears that, with a distributed Client running, the Azure library can access a loop running in the distributed cluster's thread.

Setting the loop in do_connect() as

try:
    loop = asyncio.get_running_loop()
    <connect code>
except RuntimeError:
    <call private function to set loop, and use a try/finally block to run `do_connect`, then close the loop>

fixes the example given above, but, as pointed out in the dvc PR, it doesn't fix the dd.read_csv().compute() issue. Needs to be reimplemented for fs.open(). Alternatively, I can add a utility function that reimplements the elegant context manager solution that used in #5958. Thoughts?

@martindurant
Copy link
Member

Since fsspec is for sure around, feel free to reuse its loop in its thread for all IO, which should make you immune to how dask happens to be running. fsspec sets things up here, unless a loop is specified to AsyncFileSystem or asynchronous=True.

@hayesgb
Copy link
Collaborator

hayesgb commented May 7, 2021

get_loop() is used in adlfs, and works well. I'm trying to understand if #197 contributes to the issue. If I revert back to:

AzureBlobFileSystem.loop = loop or get_loop()

it eliminates the above example, but still does not fix the dd.read_csv().compute() issue.
@martindurant -- can you comment on if/how 572 could play a role?

@martindurant
Copy link
Member

Exactly - fsspec now has exactly one loop in a specific thread, as it used to, whereas for a brief while it had one loop for each thread. You can use fsspec's sync to run async things on that thread.

asyncio.get_event_loop will not return the correct loop, since it's running in another thread (it'll make a new default loop in the main thread or error in others). get_running_loop will always fail except when within a coroutine however the loop is defined.

@hayesgb
Copy link
Collaborator

hayesgb commented May 7, 2021

Thanks! That was the piece I needed. Appreciate the input!

@hayesgb
Copy link
Collaborator

hayesgb commented May 11, 2021

This should be fixed by #229

@hayesgb hayesgb closed this as completed May 11, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants