Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiprocessed dataset builder #5107

Merged
merged 46 commits into from
Nov 9, 2022

Conversation

TevenLeScao
Copy link
Contributor

This PR adds the multiprocessing part of #2650 (but not the caching of already-computed arrow files). On the other side, loading of sharded arrow files still needs to be implemented (sharded parquet files can already be loaded).

@TevenLeScao
Copy link
Contributor Author

I would also like to add a test, but am not sure whether it should go into test_builder (more natural imo) or test_load (which already contains a lot of the things I have to import to run my current testing setup). For reference, what I run to test that it works looks like:

import os
from pathlib import Path
import shutil

import datasets
from datasets.builder import DatasetBuilder
from datasets.features import Features, Value

DATASET_LOADING_SCRIPT_NAME = "__dummy_dataset1__"

DATASET_LOADING_SCRIPT_CODE = """
import os

import datasets
from datasets import DatasetInfo, Features, Split, SplitGenerator, Value


class __DummyDataset1__(datasets.GeneratorBasedBuilder):

    def _info(self) -> DatasetInfo:
        return DatasetInfo(features=Features({"text": Value("string")}))

    def _split_generators(self, dl_manager):
        return [
            SplitGenerator(Split.TRAIN, gen_kwargs={"filepaths": [os.path.join(dl_manager.manual_dir, "train1.txt"), os.path.join(dl_manager.manual_dir, "train2.txt")]}),
            SplitGenerator(Split.TEST, gen_kwargs={"filepaths": [os.path.join(dl_manager.manual_dir, "test.txt")]}),
        ]

    def _generate_examples(self, filepaths, **kwargs):
        idx = 0
        for filepath in filepaths:
            with open(filepath, "r", encoding="utf-8") as f:
                for line in f:
                    yield idx, {"text": line.strip()}
                    idx += 1
"""


def dataset_loading_script_dir(tmp_path):
    script_name = DATASET_LOADING_SCRIPT_NAME
    script_dir = tmp_path / script_name
    script_dir.mkdir()
    script_path = script_dir / f"{script_name}.py"
    with open(script_path, "w") as f:
        f.write(DATASET_LOADING_SCRIPT_CODE)
    return str(script_dir)


def data_dir(tmp_path):
    data_dir = tmp_path / "data_dir"
    data_dir.mkdir()
    with open(data_dir / "train1.txt", "w") as f:
        f.write("foo\n" * 10)
    with open(data_dir / "train2.txt", "w") as f:
        f.write("foo\n" * 10)
    with open(data_dir / "test.txt", "w") as f:
        f.write("bar\n" * 10)
    return str(data_dir)


def load_dataset_builder_multiprocessed(tmp_path):
    builder = datasets.load_dataset_builder(
        os.path.join(dataset_loading_script_dir(tmp_path), DATASET_LOADING_SCRIPT_NAME + ".py"),
        data_dir=data_dir(tmp_path),
    )
    assert isinstance(builder, DatasetBuilder)
    assert builder.name == DATASET_LOADING_SCRIPT_NAME
    assert builder.info.features == Features({"text": Value("string")})
    builder.download_and_prepare(tmp_path / "prepare_target", max_shard_size=500, num_proc=2)

if __name__ == "__main__":
    tmp_path = "tmp"
    if os.path.exists(tmp_path):
        raise FileExistsError(f"path {tmp_path} already exists")
    os.makedirs(tmp_path)
    try:
        load_dataset_builder_multiprocessed(Path(tmp_path))
    finally:
        # pass
        shutil.rmtree(tmp_path)

@HuggingFaceDocBuilderDev

The docs for this PR live here. All of your documentation changes will be reflected on that endpoint.

@lhoestq
Copy link
Member

lhoestq commented Oct 13, 2022

Nice ! I think the test can go in test_builder.py :)

Copy link
Member

@lhoestq lhoestq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing it for both the GeneratorBasedBuilder and the ArrowBasedBuilder !

I added a comment and a question about the dataset order:

src/datasets/iterable_dataset.py Outdated Show resolved Hide resolved
Comment on lines 1617 to 1624
# should rename everything at the end, scheme still TBD
def _rename_shard(shard_id_and_rank: Tuple[int]):
shard_id, rank = shard_id_and_rank
global_shard_id = sum(shards_per_rank[:rank]) + shard_id
self._rename(
fpath.replace("SSSSS", f"{shard_id:05d}").replace("RRRRR", f"{rank:05d}"),
fpath.replace("RRRRR-SSSSS", f"{global_shard_id:05d}").replace("NNNNN", f"{total_shards:05d}"),
)
Copy link
Member

@lhoestq lhoestq Oct 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this preserve the order of the original dataset ? If so that's amazing :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does! Or at least, this preserves the order of the shards in split_generator.gen_kwargs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it doesn't after testing, but I can't quite figure out why :/

@TevenLeScao
Copy link
Contributor Author

TevenLeScao commented Oct 19, 2022

I've added sharded arrow dataset loading. Two WIP items in the PR:

  • Order is not conserved (it seems like the sharded files are read in the wrong order)
  • the tqdm for preparing the splits is wrong (it compares against the size of the whole split rather than against the size of the multiprocessing shard, but I am not sure how to access the latter)

Also naming.filenames_for_dataset_split is not very elegant imo.

@lvwerra if you don't care about order, as I do, it's functional for now but I'd still quite like to get to the bottom of this.

@TevenLeScao
Copy link
Contributor Author

Found the ordering bug ! (glob.glob returning stuff in arbitrary order)

@TevenLeScao
Copy link
Contributor Author

I fixed the tqdm to be less misleading, but it can't tell where to stop. I am a bit hesitant to add a top-level tqdm (on the shard iterator) since for most intents it will do 0 -> N shards straight, but I am not sure what is the best way to present that info here.

@lhoestq
Copy link
Member

lhoestq commented Oct 20, 2022

I'm continuing the PR :)

@lhoestq
Copy link
Member

lhoestq commented Nov 2, 2022

Alright this is ready for review - sorry it ended up so big ^^'

If I can do anything to make it easier for your to review this PR @mariosasko let me know

@lhoestq
Copy link
Member

lhoestq commented Nov 3, 2022

Multiprocessing is disabled by default but we may show a warning to encourage users to pass num_proc if the dataset is split in many files. Let me know what you think

Copy link
Collaborator

@mariosasko mariosasko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!

Do we have some benchmarks to see the speed-up?

Some nits:

docs/source/dataset_script.mdx Outdated Show resolved Hide resolved
docs/source/dataset_script.mdx Outdated Show resolved Hide resolved
docs/source/dataset_script.mdx Outdated Show resolved Hide resolved
docs/source/dataset_script.mdx Outdated Show resolved Hide resolved
src/datasets/arrow_writer.py Show resolved Hide resolved
src/datasets/builder.py Outdated Show resolved Hide resolved
src/datasets/builder.py Outdated Show resolved Hide resolved
src/datasets/builder.py Outdated Show resolved Hide resolved
src/datasets/builder.py Outdated Show resolved Hide resolved
@parsa-ra
Copy link

parsa-ra commented Nov 6, 2022

Hey, is this error seems to you guys natural?

The package built from 0d4e3907 commit tag, and here is the version displayed from the import ...

>>> datasets.__version__
'2.6.1.dev0'
>>> 
>>> data = load_dataset('dataset_loaders/rfw2latentplay', num_proc=14)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/somewhere//mambaforge/envs/datasets/lib/python3.8/site-packages/datasets/load.py", line 1719, in load_dataset
    builder_instance = load_dataset_builder(
  File "/somewhere//mambaforge/envs/datasets/lib/python3.8/site-packages/datasets/load.py", line 1523, in load_dataset_builder
    builder_instance: DatasetBuilder = builder_cls(
  File "/somewhere//mambaforge/envs/datasets/lib/python3.8/site-packages/datasets/builder.py", line 1292, in __init__
    super().__init__(*args, **kwargs)
  File "/somewhere//mambaforge/envs/datasets/lib/python3.8/site-packages/datasets/builder.py", line 303, in __init__
    self.config, self.config_id = self._create_builder_config(
  File "/somewhere//mambaforge/envs/datasets/lib/python3.8/site-packages/datasets/builder.py", line 456, in _create_builder_config
    builder_config = self.BUILDER_CONFIG_CLASS(**config_kwargs)
TypeError: __init__() got an unexpected keyword argument 'num_proc'

Let me know if I can help fixing this ...

@lhoestq
Copy link
Member

lhoestq commented Nov 7, 2022

Do we have some benchmarks to see the speed-up?

On my machine running load_dataset("oscar-corpus/OSCAR-2201", "br") (which is split in shards) I go from 2-3k examples per sec to 4-5k examples per sec with num_proc=2 😉

@lhoestq
Copy link
Member

lhoestq commented Nov 7, 2022

Hey, is this error seems to you guys natural?

The package built from 0d4e390 commit tag, and here is the version displayed from the import ...

I don't know where you got the 0d4e3907 commit tag from, it doesn't seem to be in this PR. You should try installing from this PR, or wait for it to be merged on main

@parsa-ra
Copy link

parsa-ra commented Nov 9, 2022

Splits vs Shards

Maybe it's a good idea to add some documentation on the sharding that can be achieved by passing list based arguments to the SplitGenerators gen_kwargs ...

I had to read the whole dataset generation source code to find this out ...

@lhoestq
Copy link
Member

lhoestq commented Nov 9, 2022

Maybe it's a good idea to add some documentation on the sharding that can be achieved by passing list based arguments to the SplitGenerators gen_kwargs ...

This is part of this PR :) you can check the changes in docs/source/dataset_script.mdx

@lhoestq
Copy link
Member

lhoestq commented Nov 9, 2022

I took your comments into account @mariosasko thanks !
Let me know if it's good for you now ;)

Copy link
Collaborator

@mariosasko mariosasko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks all good now (besides the failing "PR docs" job, but not sure if we need to address this)!

@lhoestq
Copy link
Member

lhoestq commented Nov 9, 2022

The doc CI should be fixed by now hopefully, merging !

@lhoestq lhoestq merged commit 2945690 into huggingface:main Nov 9, 2022
@lhoestq lhoestq changed the title Multiprocessed dataset builder [WIP] Multiprocessed dataset builder Dec 1, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants