Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sharded save_to_disk + multiprocessing #5268

Merged
merged 31 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
beef55e
add num_shards, num_proc, storage_options to save_to_disk
lhoestq Nov 17, 2022
d1b7fb5
minor
lhoestq Nov 17, 2022
2d59bb6
add tests
lhoestq Nov 18, 2022
2e270dc
remove old s3fs integreation tests
lhoestq Nov 18, 2022
532ae18
style
lhoestq Nov 18, 2022
05436cc
Merge branch 'main' into sharded-save_to_disk
lhoestq Nov 18, 2022
f548e01
style
lhoestq Nov 18, 2022
dcd6363
Update DatasetDict.save_to_disk
lhoestq Nov 21, 2022
26a3e15
test dataset dict
lhoestq Nov 21, 2022
291a883
update dataset dict load_from_disk
lhoestq Nov 21, 2022
c55028b
minor
lhoestq Nov 21, 2022
f122f6d
update test
lhoestq Nov 21, 2022
8305f8c
update docs
lhoestq Nov 21, 2022
d1d8ef8
backport to_reader to pyarrow < 8
lhoestq Nov 22, 2022
7057792
typo
lhoestq Nov 22, 2022
5e737c0
support both max_shard_size and num_shards
lhoestq Dec 7, 2022
598b9da
style
lhoestq Dec 7, 2022
24e24bf
docstrings
lhoestq Dec 7, 2022
16bb14c
Merge branch 'main' into sharded-save_to_disk
lhoestq Dec 8, 2022
917f921
Merge branch 'main' into sharded-save_to_disk
lhoestq Dec 9, 2022
75347aa
test _estimate_nbytes
lhoestq Dec 9, 2022
f86ed9f
Merge branch 'main' into sharded-save_to_disk
lhoestq Dec 12, 2022
fc39b83
add test for num_shards
lhoestq Dec 12, 2022
a103ff0
Merge branch 'main' into sharded-save_to_disk
lhoestq Dec 13, 2022
d004f58
style
lhoestq Dec 13, 2022
5b36d97
Merge branch 'main' into sharded-save_to_disk
lhoestq Dec 14, 2022
c1db7bd
mario's comment
lhoestq Dec 14, 2022
f3562d2
add config.PBAR_REFRESH_TIME_INTERVAL
lhoestq Dec 14, 2022
c2b38fa
fix docstrings
lhoestq Dec 14, 2022
ce66732
use kwargs_iterable in iflatmap_unordered
lhoestq Dec 14, 2022
44e5156
fix tests
lhoestq Dec 14, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions docs/source/filesystems.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ Here are examples for S3, Google Cloud Storage and Azure Blob Storage.

### Amazon S3

1. Install the S3 dependency with 🤗 Datasets:
1. Install the S3 FileSystem implementation:
mariosasko marked this conversation as resolved.
Show resolved Hide resolved

```
>>> pip install datasets[s3]
>>> pip install s3fs
```

2. Define your credentials
Expand Down Expand Up @@ -89,7 +89,7 @@ Otherwise, include your `aws_access_key_id` and `aws_secret_access_key` whenever
```py
>>> storage_options = {"anon": True} # for anonymous connection
# or use your credentials
>>> storage_options = {"account_name": ACCOUNT_NAME, "account_key": ACCOUNT_KEY) # gen 2 filesystem
>>> storage_options = {"account_name": ACCOUNT_NAME, "account_key": ACCOUNT_KEY} # gen 2 filesystem
# or use your credentials with the gen 1 filesystem
>>> storage_options={"tenant_id": TENANT_ID, "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET}
```
Expand Down Expand Up @@ -173,11 +173,11 @@ After you have processed your dataset, you can save it to your cloud storage wit

```py
# saves encoded_dataset to amazon s3
>>> encoded_dataset.save_to_disk("s3://my-private-datasets/imdb/train", fs=fs)
>>> encoded_dataset.save_to_disk("s3://my-private-datasets/imdb/train", storage_options=storage_options)
# saves encoded_dataset to google cloud storage
>>> encoded_dataset.save_to_disk("gcs://my-private-datasets/imdb/train", fs=fs)
>>> encoded_dataset.save_to_disk("gcs://my-private-datasets/imdb/train", storage_options=storage_options)
# saves encoded_dataset to microsoft azure blob/datalake
>>> encoded_dataset.save_to_disk("adl://my-private-datasets/imdb/train", fs=fs)
>>> encoded_dataset.save_to_disk("adl://my-private-datasets/imdb/train", storage_options=storage_options)
```

<Tip>
Expand All @@ -202,7 +202,7 @@ When you are ready to use your dataset again, reload it with [`Dataset.load_from
```py
>>> from datasets import load_from_disk
# load encoded_dataset from cloud storage
>>> dataset = load_from_disk("s3://a-public-datasets/imdb/train", fs=fs)
>>> dataset = load_from_disk("s3://a-public-datasets/imdb/train", storage_options=storage_options)
>>> print(len(dataset))
25000
```
14 changes: 2 additions & 12 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
"importlib_metadata;python_version<'3.8'",
# to save datasets locally or on any filesystem
# minimum 2021.11.1 so that BlockSizeError is fixed: see https://github.com/fsspec/filesystem_spec/pull/830
"fsspec[http]>=2021.11.1", # aligned s3fs with this
"fsspec[http]>=2021.11.1",
# for data streaming via http
"aiohttp",
# To get datasets from the Datasets Hub on huggingface.co
Expand Down Expand Up @@ -122,13 +122,8 @@
# optional dependencies
"apache-beam>=2.26.0",
"elasticsearch<8.0.0", # 8.0 asks users to provide hosts or cloud_id when instantiating ElasticSearch()
"aiobotocore>=2.0.1", # required by s3fs>=2021.11.1
"boto3>=1.19.8", # to be compatible with aiobotocore>=2.0.1 - both have strong dependencies on botocore
"botocore>=1.22.8", # to be compatible with aiobotocore and boto3
"faiss-cpu>=1.6.4",
"fsspec[s3]",
"lz4",
"moto[s3,server]==2.0.4",
"py7zr",
"rarfile>=4.0",
"s3fs>=2021.11.1", # aligned with fsspec[http]>=2021.11.1
Expand Down Expand Up @@ -181,12 +176,7 @@
],
"tensorflow_gpu": ["tensorflow-gpu>=2.2.0,!=2.6.0,!=2.6.1"],
"torch": ["torch"],
"s3": [
"fsspec",
"boto3",
"botocore",
"s3fs",
],
"s3": ["s3fs"],
"streaming": [], # for backward compatibility
"dev": TESTS_REQUIRE + QUALITY_REQUIRE,
"tests": TESTS_REQUIRE,
Expand Down
227 changes: 166 additions & 61 deletions src/datasets/arrow_dataset.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/datasets/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ def download_and_prepare(
Multiprocessing is disabled by default.

<Added version="2.7.0"/>
storage_options (:obj:`dict`, *optional*): Key/value pairs to be passed on to the caching file-system backend, if any.
storage_options (:obj:`dict`, *optional*): Key/value pairs to be passed on to the file-system backend, if any.

<Added version="2.5.0"/>
**download_and_prepare_kwargs (additional keyword arguments): Keyword arguments.
Expand Down
114 changes: 78 additions & 36 deletions src/datasets/dataset_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import copy
import json
import os
import posixpath
import re
import warnings
from io import BytesIO
Expand Down Expand Up @@ -1032,70 +1033,98 @@ def shuffle(
}
)

def save_to_disk(self, dataset_dict_path: str, fs=None):
def save_to_disk(
self,
dataset_dict_path: PathLike,
fs="deprecated",
num_shards: Optional[Dict[str, int]] = None,
num_proc: Optional[int] = None,
storage_options: Optional[dict] = None,
):
"""
Saves a dataset dict to a filesystem using either :class:`~filesystems.S3FileSystem` or
``fsspec.spec.AbstractFileSystem``.

For :class:`Image` and :class:`Audio` data:

If your images and audio files are local files, then the resulting arrow file will store paths to these files.
If you want to include the bytes or your images or audio files instead, you must `read()` those files first.
This can be done by storing the "bytes" instead of the "path" of the images or audio files:

```python
>>> def read_image_file(example):
... with open(example["image"].filename, "rb") as f:
... return {"image": {"bytes": f.read()}}
>>> ds = ds.map(read_image_file)
>>> ds.save_to_disk("path/to/dataset/dir")
```

```python
>>> def read_audio_file(example):
... with open(example["audio"]["path"], "rb") as f:
... return {"audio": {"bytes": f.read()}}
>>> ds = ds.map(read_audio_file)
>>> ds.save_to_disk("path/to/dataset/dir")
```
All the Image() and Audio() data are stored in the arrow files.
If you want to store paths or urls, please use the Value("string") type.

Args:
dataset_dict_path (``str``): Path (e.g. `dataset/train`) or remote URI
dataset_dict_path (``PathLike``): Path (e.g. `path/to/dataset`) or remote URI
(e.g. `s3://my-bucket/dataset/train`) of the dataset dict directory where the dataset dict will be
saved to.
fs (:class:`~filesystems.S3FileSystem`, ``fsspec.spec.AbstractFileSystem``, optional, defaults ``None``):
Instance of the remote filesystem used to download the files from.
num_shards (:obj:`Dict[str, int]`, optional): Number of shards to write.
You need to provide the number of shards for each dataset in the dataset dictionary.
Default to the same value as `num_proc` if specified.

<Added version="2.8.0"/>
num_proc (:obj:`int`, optional, default `None`): Number of processes when downloading and generating the dataset locally.
Multiprocessing is disabled by default.

<Added version="2.8.0"/>
storage_options (:obj:`dict`, *optional*): Key/value pairs to be passed on to the file-system backend, if any.

<Added version="2.8.0"/>

"""
if is_remote_filesystem(fs):
dest_dataset_dict_path = extract_path_from_uri(dataset_dict_path)
else:
fs = fsspec.filesystem("file")
dest_dataset_dict_path = dataset_dict_path
os.makedirs(dest_dataset_dict_path, exist_ok=True)
if fs != "deprecated":
warnings.warn(
"'fs' was is deprecated in favor of 'storage_options' in version 2.8.0 and will be removed in 3.0.0.\n"
"You can remove this warning by passing 'storage_options=fs.storage_options' instead.",
FutureWarning,
)
storage_options = fs.storage_options

fs_token_paths = fsspec.get_fs_token_paths(dataset_dict_path, storage_options=storage_options)
fs: fsspec.AbstractFileSystem = fs_token_paths[0]
is_local = not is_remote_filesystem(fs)
path_join = os.path.join if is_local else posixpath.join

if num_shards is None:
num_shards = {k: None for k in self}
elif not isinstance(num_shards, dict):
raise ValueError(
"Please provide one `num_shards` per dataset in the dataset dictionary, e.g. {{'train': 128, 'test': 4}}"
)

if is_local:
Path(dataset_dict_path).resolve().mkdir(parents=True, exist_ok=True)

json.dump(
{"splits": list(self)},
fs.open(Path(dest_dataset_dict_path, config.DATASETDICT_JSON_FILENAME).as_posix(), "w", encoding="utf-8"),
fs.open(path_join(dataset_dict_path, config.DATASETDICT_JSON_FILENAME), "w", encoding="utf-8"),
)
for k, dataset in self.items():
dataset.save_to_disk(Path(dest_dataset_dict_path, k).as_posix(), fs)
dataset.save_to_disk(
path_join(dataset_dict_path, k),
num_shards=num_shards.get(k),
num_proc=num_proc,
storage_options=storage_options,
)

@staticmethod
def load_from_disk(dataset_dict_path: str, fs=None, keep_in_memory: Optional[bool] = None) -> "DatasetDict":
def load_from_disk(
dataset_dict_path: PathLike,
fs="deprecated",
keep_in_memory: Optional[bool] = None,
storage_options: Optional[dict] = None,
) -> "DatasetDict":
"""
Load a dataset that was previously saved using :meth:`save_to_disk` from a filesystem using either
:class:`~filesystems.S3FileSystem` or ``fsspec.spec.AbstractFileSystem``.

Args:
dataset_dict_path (:obj:`str`): Path (e.g. ``"dataset/train"``) or remote URI (e.g.
dataset_dict_path (``PathLike``): Path (e.g. ``"path/to/dataset"``) or remote URI (e.g.
``"s3//my-bucket/dataset/train"``) of the dataset dict directory where the dataset dict will be loaded
from.
fs (:class:`~filesystems.S3FileSystem` or ``fsspec.spec.AbstractFileSystem``, optional, default ``None``):
Instance of the remote filesystem used to download the files from.
keep_in_memory (:obj:`bool`, default ``None``): Whether to copy the dataset in-memory. If `None`, the
dataset will not be copied in-memory unless explicitly enabled by setting
`datasets.config.IN_MEMORY_MAX_SIZE` to nonzero. See more details in the
:ref:`load_dataset_enhancing_performance` section.
storage_options (:obj:`dict`, *optional*): Key/value pairs to be passed on to the file-system backend, if any.

<Added version="2.8.0"/>

Returns:
:class:`DatasetDict`
Expand All @@ -1106,6 +1135,17 @@ def load_from_disk(dataset_dict_path: str, fs=None, keep_in_memory: Optional[boo
>>> ds = load_from_disk('path/to/dataset/directory')
```
"""
if fs != "deprecated":
warnings.warn(
"'fs' was is deprecated in favor of 'storage_options' in version 2.8.0 and will be removed in 3.0.0.\n"
"You can remove this warning by passing 'storage_options=fs.storage_options' instead.",
FutureWarning,
)
storage_options = fs.storage_options

fs_token_paths = fsspec.get_fs_token_paths(dataset_dict_path, storage_options=storage_options)
fs: fsspec.AbstractFileSystem = fs_token_paths[0]

dataset_dict = DatasetDict()
if is_remote_filesystem(fs):
dest_dataset_dict_path = extract_path_from_uri(dataset_dict_path)
Expand All @@ -1124,7 +1164,9 @@ def load_from_disk(dataset_dict_path: str, fs=None, keep_in_memory: Optional[boo
if is_remote_filesystem(fs)
else Path(dest_dataset_dict_path, k).as_posix()
)
dataset_dict[k] = Dataset.load_from_disk(dataset_dict_split_path, fs, keep_in_memory=keep_in_memory)
dataset_dict[k] = Dataset.load_from_disk(
dataset_dict_split_path, keep_in_memory=keep_in_memory, storage_options=storage_options
)
return dataset_dict

@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


# Import fixture modules as plugins
pytest_plugins = ["tests.fixtures.files", "tests.fixtures.hub", "tests.fixtures.s3", "tests.fixtures.fsspec"]
pytest_plugins = ["tests.fixtures.files", "tests.fixtures.hub", "tests.fixtures.fsspec"]


def pytest_collection_modifyitems(config, items):
Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,4 @@ def mock_fsspec(monkeypatch):
@pytest.fixture
def mockfs(tmp_path_factory, mock_fsspec):
local_fs_dir = tmp_path_factory.mktemp("mockfs")
return MockFileSystem(local_root_dir=local_fs_dir)
return MockFileSystem(local_root_dir=local_fs_dir, auto_mkdir=True)
74 changes: 0 additions & 74 deletions tests/fixtures/s3.py

This file was deleted.

Loading