Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fs.mkdir() is creating empty files on Azure Blob #137

Closed
ldacey opened this issue Nov 23, 2020 · 14 comments
Closed

fs.mkdir() is creating empty files on Azure Blob #137

ldacey opened this issue Nov 23, 2020 · 14 comments

Comments

@ldacey
Copy link

ldacey commented Nov 23, 2020

What happened:
fs.mkdir() is creating empty files on Azure Blob.

A more detailed example of how this impacts writing pyarrow datasets can be found within the pyarrow issue below. Basically, empty files are being generated for each final partition field.

https://issues.apache.org/jira/projects/ARROW/issues/ARROW-10694

What you expected to happen:
fs.mkdir() should only create the folder and not generate empty files.

Minimal Complete Verifiable Example:

fs = fsspec.filesystem(protocol='abfs', account_name=base.login, account_key=base.password)

fs.mkdir("dev/test2/") #note the slash at the end, this creates a folder on azure blob
[2020-11-23 10:17:16,285] {spec.py:679} INFO - Returning a list of containers in the azure blob storage account

fs.mkdir("dev/test2") #this creates an empty file in the "dev" container

Anything else we need to know?:

Environment:
I am using adlfs master branch right now ( pip install git+https://github.com/dask/adlfs.git)

  • Dask version: 2.30
  • Python version: 3.8.6
  • Operating System: Ubuntu 18.04
  • Install method (conda, pip, source): conda-forge
@ldacey
Copy link
Author

ldacey commented Nov 23, 2020

Here is the output of printing the fs_mkdir() method. If I have the final slash in the path name then it creates the folder without any empty files. If I remove that final slash then a folder and an empty file are both created.

path = "dev/test2/"
delimiter = "/"

container_name, path = fs.split_path(path, delimiter=delimiter)
print(f"Container: {container_name}")
_containers = await fs._ls("dev")
_containers = [c["name"] for c in _containers]
print(f"_containers: {_containers}")
container_name_as_dir = f"{container_name}/"
print(f"Directory: {container_name}/")
if (container_name_as_dir not in _containers) and not path:
    print("Creating new container")
elif (container_name in [container_path.split("/")[0] for container_path in _containers]) and path:
    print("Creating prefix")
    print(f'List of containers: {[container_path.split("/")[0] for container_path in _containers]}')

Container: dev
_containers: ['dev/test1/', 'dev/test-dataset/', 'dev/test2/']
Directory: dev/
Creating prefix
List of containers: ['dev', 'dev', 'dev']

So when I create partitions with pyarrow, are we creating the folders before adding the filename somehow? Perhaps because we have incrementing filenames like part-0, part-1, and so on?

"Technically speaking, you can't have an empty folder in Azure Blob Storage as blob storage has a 2 level hierarchy - blob container and blob. You essentially create an illusion of a folder by prefixing the file name with the folder name you like e.g. assuming you want a style sheet (say site.css) in say css folder, you name the stylesheet file as css/site.css."

@hayesgb
Copy link
Collaborator

hayesgb commented Nov 23, 2020

The quote listed here is correct. Azure Storage does not have true directories. Azure views a blob as the entire path, not a file buried within a directory. It identifies the folders as "BlobPrefixes", which are identified for convenience.

I'm curious. If we create a partitioned parquet file with Dask, and pyarrow as the engine, we don't see the behavior you describe. Meaning,

dd.to_parquet(ddf, "abfs://container_name/my_file.parquet", engine="pyarrow)

returns a sequentially numbered set of parquet files within the my_parquet.parquet "folder"

@ldacey
Copy link
Author

ldacey commented Nov 24, 2020

I believe that Dask is still using the "old" pyarrow method of writing parquet data. I am only seeing these empty files generated when using the pyarrow.dataset write_dataset() method which uses adlfs. Adding @jorisvandenbossche

This writes empty files for each partition:

ds.write_dataset(data=table, 
                 base_dir="dev/test-dataset6", 
                 format="parquet",
                 partitioning=ds.HivePartitioning(pa.schema([("report_date", pa.date32())])), 
                 schema=schema,
                 filesystem=fs, 
                 file_options=None, 
                 use_threads=True)

This does not create empty files (this is the older method):

pq.write_to_dataset(table=table, 
                    root_path="dev/test-dataset5", 
                    partition_cols=["report_date"],
                    filesystem=fs,
                    version="2.0",
                    use_dictionary=True,
                    compression="snappy",
                    flavor="spark",
                   )

The same table was used for both examples.

len(fs.ls("dev/test-dataset5"))
35

len(fs.ls("dev/test-dataset6"))
70

@ldacey
Copy link
Author

ldacey commented Feb 23, 2021

Is this something which can be fixed eventually? Right now I am listing the empty blobs and deleting them if they do not match certain file patterns or extensions.

@hayesgb
Copy link
Collaborator

hayesgb commented Feb 23, 2021 via email

@hayesgb
Copy link
Collaborator

hayesgb commented Feb 24, 2021

@ldacey -- It looks to me like the issue only materializes if the folder being written is a nested folder (i.e. not a container). Is that consistent with your observation?

@hayesgb
Copy link
Collaborator

hayesgb commented Feb 27, 2021

@ldacey -- I've been working on this, and it works out to be a fairly significant change. I was thinking about a comment above -- that Dask uses the "old" way of writing parquet files.

The original approach was intended to align to Dask, such that writing:
dd.to_parquet("abfs://<my_container>/my_collection.parquet")

would yield a collection of partitioned parquet files below, such that:
my_collection.parquet <-- Folder
-part1.parquet
-part2.parquet
-part3.parquet
etc.

Getting to proper folders may end up looking like:
dd.to_parquet("abfs://<my_container>/my_collection.parquet")
with a partition structure like:
my_collection.parquet/ <-- Folder (notice the "/")
-part1.parquet
-part2.parquet
-part3.parquet
etc.

I'm trying to be sure I'm clear on the differences in expected behaviors.

@hayesgb
Copy link
Collaborator

hayesgb commented Mar 1, 2021

@ldacey Please take a look at the branch named "folders2". I believe this will address both the issue cited above, and #186. Would appreciate your feedback.

@ldacey
Copy link
Author

ldacey commented Mar 3, 2021

Okay, testing with this environment:

pyarrow 3.0.0
fsspec 0.8.5
adlfs v0.6.3+7.g9c39408
pandas 1.2.2
numpy 1.20.1
turbodbc 4.1.2
adal 1.2.6
import pyarrow as pa
import pyarrow.dataset as ds
import pendulum

class TrackingDataset:
    def __init__(self, schema, filesystem):
        self.schema = schema
        self.partitioning = ds.HivePartitioning(self.schema)
        self.filesystem = filesystem

    def write_dataset(self, table, base_dir, source):
        fs = self.filesystem
        template = f'{source}-{pendulum.now().strftime("%Y%m%d%H%M%S")}'
        ds.write_dataset(table, base_dir, format='parquet', partitioning=self.partitioning, basename_template=f'{template}-{{i}}.parquet', filesystem=self.filesystem)
        files = fs.glob(fs.sep.join([base_dir, "**", f"{template}-*"]), details=False, invalidate_cache=True)
        return files

    
table_one = pa.table({'month_id': [202101, 202102, 202102, 202102], 
                      'date_id': [20210103, 20210201, 20210214, 20210219], 
                      'val': [1, 2, 3, 4]})

table_two = pa.table({'month_id': [202101, 202102, 202102, 202102], 
                      'date_id': [20210103, 20210211, 20210217, 20210219], 
                      'val': [5, 6, 7, 8]})

writer = TrackingDataset(schema=pa.schema({'month_id': pa.int64(), 'date_id': pa.int64()}), filesystem=fs)
table_1_files = writer.write_dataset(table_one, source="partition-1", base_dir='/dev/folder2')
table_2_files = writer.write_dataset(table_two, source="partition-2", base_dir='/dev/folder2')

That seems to have worked, there are no longer any empty files:

fs.find("dev/folder2")
['dev/folder2/month_id=202101/date_id=20210103/partition-1-20210303150418-0.parquet',
 'dev/folder2/month_id=202101/date_id=20210103/partition-1-20210303150752-0.parquet',
 'dev/folder2/month_id=202101/date_id=20210103/partition-1-20210303150840-0.parquet',
 'dev/folder2/month_id=202101/date_id=20210103/partition-2-20210303150419-0.parquet',
 'dev/folder2/month_id=202101/date_id=20210103/partition-2-20210303150752-0.parquet',
 'dev/folder2/month_id=202101/date_id=20210103/partition-2-20210303150840-0.parquet',
 'dev/folder2/month_id=202102/date_id=20210201/partition-1-20210303150418-1.parquet',
 'dev/folder2/month_id=202102/date_id=20210201/partition-1-20210303150752-1.parquet',
 'dev/folder2/month_id=202102/date_id=20210201/partition-1-20210303150840-1.parquet',
 'dev/folder2/month_id=202102/date_id=20210211/partition-2-20210303150419-1.parquet',
 'dev/folder2/month_id=202102/date_id=20210211/partition-2-20210303150752-1.parquet',
 'dev/folder2/month_id=202102/date_id=20210211/partition-2-20210303150840-1.parquet',
 'dev/folder2/month_id=202102/date_id=20210214/partition-1-20210303150418-2.parquet',
 'dev/folder2/month_id=202102/date_id=20210214/partition-1-20210303150752-2.parquet',
 'dev/folder2/month_id=202102/date_id=20210214/partition-1-20210303150840-2.parquet',
 'dev/folder2/month_id=202102/date_id=20210217/partition-2-20210303150419-2.parquet',
 'dev/folder2/month_id=202102/date_id=20210217/partition-2-20210303150752-2.parquet',
 'dev/folder2/month_id=202102/date_id=20210217/partition-2-20210303150840-2.parquet',
 'dev/folder2/month_id=202102/date_id=20210219/partition-1-20210303150418-3.parquet',
 'dev/folder2/month_id=202102/date_id=20210219/partition-1-20210303150752-3.parquet',
 'dev/folder2/month_id=202102/date_id=20210219/partition-1-20210303150840-3.parquet',
 'dev/folder2/month_id=202102/date_id=20210219/partition-2-20210303150419-3.parquet',
 'dev/folder2/month_id=202102/date_id=20210219/partition-2-20210303150752-3.parquet',
 'dev/folder2/month_id=202102/date_id=20210219/partition-2-20210303150840-3.parquet']

Regarding #186, I was able to get this to work on the dataset I wrote above:

filter_list = [['date_id', '=', 20210214]]
filter_expression = pq._filters_to_expression(filters=filter_list)
print(filter_expression)
dataset.to_table(filter=filter_expression)

But I still ran into the issue of to_table() not completing on a real dataset - this warning was displayed a few times and it got stuck (one of my bigger datasets, but only filtering for a few days of data - no issues when I run the same code on 0.5.9). Not sure if it is due to the number of fragments/partitions?

TypeError: cannot create weak reference to 'NoneType' object
Exception ignored in: <function AbstractBufferedFile.__del__ at 0x7f1d9377edc0>
Traceback (most recent call last):
  File "/home/jovyan/.local/lib/python3.8/site-packages/fsspec/spec.py", line 1538, in __del__
    self.close()
  File "/home/jovyan/.local/lib/python3.8/site-packages/adlfs/spec.py", line 1616, in close
    maybe_sync(self.container_client.close, self)
  File "/home/jovyan/.local/lib/python3.8/site-packages/fsspec/asyn.py", line 93, in maybe_sync
    return _run_until_done(func(*args, **kwargs))
  File "/home/jovyan/.local/lib/python3.8/site-packages/fsspec/asyn.py", line 27, in _run_until_done
    asyncio.tasks._unregister_task(task)
  File "/opt/conda/lib/python3.8/_weakrefset.py", line 114, in discard
    self.data.discard(ref(item))
TypeError: cannot create weak reference to 'NoneType' object
/opt/conda/lib/python3.8/asyncio/tasks.py:919: RuntimeWarning: coroutine 'AsyncStorageAccountHostsMixin.close' was never awaited
  futures._chain_future(ensure_future(coro, loop=loop), future)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

@hayesgb
Copy link
Collaborator

hayesgb commented Mar 8, 2021

@ldacey #193 includes the fix for the hive partitioning, as well migrating several async clients to async context managers and adds error handling for the finalizer, which I believe will resolve your issue above. I've only seen warning once, so your feedback would be appreciated.

The difference between 0.5.x and 0.6.x is that 0.6.x implements async operations for AzureBlobFile objects.

@ldacey
Copy link
Author

ldacey commented Mar 8, 2021

I installed the master branch and tested it. I had no issues with empty files when trying to write the example data from above, or when reading a dataset into a table from a moderate sized dataset.

When I tried creating a table from a larger dataset (20,000 file fragments but filtered for a single partition) I still ran into a warning and it just got stuck. Do you think this is a separate issue potentially caused by a huge dataset?

/home/jovyan/.local/lib/python3.8/site-packages/adlfs/spec.py:1808: RuntimeWarning: coroutine 'AsyncStorageAccountHostsMixin.close' was never awaited
  pass
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

@hayesgb
Copy link
Collaborator

hayesgb commented Mar 8, 2021

Thanks for the help @ldacey. It's looks like its related to the finalizer and maybe_sync. It appears maybe_sync is running the finalizer close operation as if its synchronous. Would you mind trying it again, but when instantiating the AzureBlobFileSystem, setting the parameter asynchronous=True?

@ldacey
Copy link
Author

ldacey commented Mar 8, 2021

Sure. Hm, I ran into an immediate error when I turned async on.

fs = adlfs.AzureBlobFileSystem(protocol="abfs", account_name=name, account_key=key, asynchronous=True)

asynchronous=False:
{'name': 'dev', 'size': None, 'type': 'directory'}

asynchronous=True:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-22-b34d79647643> in <module>
----> 1 fs.info("dev")

~/.local/lib/python3.8/site-packages/adlfs/spec.py in info(self, path, refresh, **kwargs)
    537             fetch_from_azure = True
    538         if fetch_from_azure:
--> 539             return maybe_sync(self._info, self, path, refresh)
    540         return super().info(path)
    541 

~/.local/lib/python3.8/site-packages/fsspec/asyn.py in maybe_sync(func, self, *args, **kwargs)
     91         if inspect.iscoroutinefunction(func):
     92             # run coroutine while pausing this one (because we are within async)
---> 93             return _run_until_done(func(*args, **kwargs))
     94         else:
     95             # make awaitable which then calls the blocking function

~/.local/lib/python3.8/site-packages/fsspec/asyn.py in _run_until_done(coro)
     25     loop = asyncio.get_event_loop()
     26     task = asyncio.current_task()
---> 27     asyncio.tasks._unregister_task(task)
     28     del asyncio.tasks._current_tasks[loop]
     29     runner = loop.create_task(coro)

/opt/conda/lib/python3.8/_weakrefset.py in discard(self, item)
    112         if self._pending_removals:
    113             self._commit_removals()
--> 114         self.data.discard(ref(item))
    115 
    116     def update(self, other):

TypeError: cannot create weak reference to 'NoneType' object

Here are the libraries I am using:

pyarrow 3.0.0
adlfs v0.6.3+20.g19136f9
adal 1.2.6
fsspec 0.8.5

@hayesgb
Copy link
Collaborator

hayesgb commented Mar 9, 2021

I'm going to close this issue with #193 , and move the discussion about the finalizer over to #186

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants