Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Parquet loader + from_parquet and to_parquet #2537

Merged
merged 11 commits into from
Jun 30, 2021
Merged
6 changes: 3 additions & 3 deletions .github/workflows/benchmarks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ jobs:
pip install setuptools wheel
pip install -e .[benchmarks]

# pyarrow==1.0.0
pip install pyarrow==1.0.0
# pyarrow==3.0.0
pip install pyarrow==3.0.0

dvc repro --force

Expand All @@ -26,7 +26,7 @@ jobs:

python ./benchmarks/format.py report.json report.md

echo "<details>\n<summary>Show benchmarks</summary>\n\nPyArrow==1.0.0\n" > final_report.md
echo "<details>\n<summary>Show benchmarks</summary>\n\nPyArrow==3.0.0\n" > final_report.md
cat report.md >> final_report.md

# pyarrow
Expand Down
Binary file added datasets/parquet/dummy/0.0.0/dummy_data.zip
Binary file not shown.
7 changes: 3 additions & 4 deletions docs/source/package_reference/main_classes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@ The base class :class:`datasets.Dataset` implements a Dataset backed by an Apach
__getitem__, cleanup_cache_files,
map, filter, select, sort, shuffle, train_test_split, shard, export,
save_to_disk, load_from_disk, flatten_indices,
to_csv, to_pandas, to_dict,
to_csv, to_pandas, to_dict, to_json, to_parquet,
add_faiss_index, add_faiss_index_from_external_arrays, save_faiss_index, load_faiss_index,
add_elasticsearch_index, load_elasticsearch_index,
list_indexes, get_index, drop_index, search, search_batch, get_nearest_examples, get_nearest_examples_batch,
info, split, builder_name, citation, config_name, dataset_size,
description, download_checksums, download_size, features, homepage,
license, size_in_bytes, supervised_keys, version,
from_csv, from_json, from_text,
from_csv, from_json, from_parquet, from_text,
prepare_for_task, align_labels_with_mapping,
to_json,

.. autofunction:: datasets.concatenate_datasets

Expand All @@ -56,7 +55,7 @@ It also has dataset transform methods like map or filter, to process all the spl
flatten_, cast_, remove_columns_, rename_column_,
flatten, cast, remove_columns, rename_column, class_encode_column,
save_to_disk, load_from_disk,
from_csv, from_json, from_text,
from_csv, from_json, from_parquet, from_text,
prepare_for_task, align_labels_with_mapping


Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
# We use numpy>=1.17 to have np.random.Generator (Dataset shuffling)
"numpy>=1.17",
# Backend and serialization.
# Minimum 1.0.0 to avoid permission errors on windows when using the compute layer on memory mapped data
# Minimum 3.0.0 to support mix of struct and list types in parquet, and batch iterators of parquet data
# pyarrow 4.0.0 introduced segfault bug, see: https://github.com/huggingface/datasets/pull/2268
"pyarrow>=1.0.0,!=4.0.0",
# For smart caching dataset processing
Expand Down
67 changes: 64 additions & 3 deletions src/datasets/arrow_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ def from_csv(
path_or_paths (path-like or list of path-like): Path(s) of the CSV file(s).
split (:class:`NamedSplit`, optional): Split name to be assigned to the dataset.
features (:class:`Features`, optional): Dataset features.
cache_dir (:obj:`str`, optional, default ``"~/datasets"``): Directory to cache data.
cache_dir (:obj:`str`, optional, default ``"~/.cache/huggingface/datasets"``): Directory to cache data.
keep_in_memory (:obj:`bool`, default ``False``): Whether to copy the data in-memory.
**kwargs: Keyword arguments to be passed to :meth:`pandas.read_csv`.

Expand Down Expand Up @@ -498,7 +498,7 @@ def from_json(
path_or_paths (path-like or list of path-like): Path(s) of the JSON or JSON Lines file(s).
split (:class:`NamedSplit`, optional): Split name to be assigned to the dataset.
features (:class:`Features`, optional): Dataset features.
cache_dir (:obj:`str`, optional, default ``"~/datasets"``): Directory to cache data.
cache_dir (:obj:`str`, optional, default ``"~/.cache/huggingface/datasets"``): Directory to cache data.
keep_in_memory (:obj:`bool`, default ``False``): Whether to copy the data in-memory.
field (:obj:`str`, optional): Field name of the JSON file where the dataset is contained in.
**kwargs: Keyword arguments to be passed to :class:`JsonConfig`.
Expand All @@ -519,6 +519,45 @@ def from_json(
**kwargs,
).read()

@staticmethod
def from_parquet(
path_or_paths: Union[PathLike, List[PathLike]],
split: Optional[NamedSplit] = None,
features: Optional[Features] = None,
cache_dir: str = None,
keep_in_memory: bool = False,
columns: Optional[List[str]] = None,
**kwargs,
):
"""Create Dataset from Parquet file(s).

Args:
path_or_paths (path-like or list of path-like): Path(s) of the Parquet file(s).
split (:class:`NamedSplit`, optional): Split name to be assigned to the dataset.
features (:class:`Features`, optional): Dataset features.
cache_dir (:obj:`str`, optional, default ``"~/.cache/huggingface/datasets"``): Directory to cache data.
keep_in_memory (:obj:`bool`, default ``False``): Whether to copy the data in-memory.
columns (:obj:`List[str]`, optional): If not None, only these columns will be read from the file.
A column name may be a prefix of a nested field, e.g. 'a' will select
'a.b', 'a.c', and 'a.d.e'.
**kwargs: Keyword arguments to be passed to :class:`ParquetConfig`.

Returns:
:class:`Dataset`
"""
# Dynamic import to avoid circular dependency
from .io.parquet import ParquetDatasetReader

return ParquetDatasetReader(
path_or_paths,
split=split,
features=features,
cache_dir=cache_dir,
keep_in_memory=keep_in_memory,
columns=columns,
**kwargs,
).read()

@staticmethod
def from_text(
path_or_paths: Union[PathLike, List[PathLike]],
Expand All @@ -534,7 +573,7 @@ def from_text(
path_or_paths (path-like or list of path-like): Path(s) of the text file(s).
split (:class:`NamedSplit`, optional): Split name to be assigned to the dataset.
features (:class:`Features`, optional): Dataset features.
cache_dir (:obj:`str`, optional, default ``"~/datasets"``): Directory to cache data.
cache_dir (:obj:`str`, optional, default ``"~/.cache/huggingface/datasets"``): Directory to cache data.
keep_in_memory (:obj:`bool`, default ``False``): Whether to copy the data in-memory.
**kwargs: Keyword arguments to be passed to :class:`TextConfig`.

Expand Down Expand Up @@ -2862,6 +2901,28 @@ def to_pandas(
for offset in range(0, len(self), batch_size)
)

def to_parquet(
self,
path_or_buf: Union[PathLike, BinaryIO],
batch_size: Optional[int] = None,
**parquet_writer_kwargs,
) -> int:
"""Exports the dataset to parquet

Args:
path_or_buf (``PathLike`` or ``FileOrBuffer``): Either a path to a file or a BinaryIO.
batch_size (Optional ``int``): Size of the batch to load in memory and write at once.
Defaults to :obj:`datasets.config.DEFAULT_MAX_BATCH_SIZE`.
parquet_writer_kwargs: Parameters to pass to PyArrow's :class:`pyarrow.parquet.ParquetWriter`

Returns:
int: The number of characters or bytes written
"""
# Dynamic import to avoid circular dependency
from .io.parquet import ParquetDatasetWriter

return ParquetDatasetWriter(self, path_or_buf, batch_size=batch_size, **parquet_writer_kwargs).write()

@transmit_format
@fingerprint_transform(inplace=False)
def add_column(self, name: str, column: Union[list, np.array], new_fingerprint: str):
Expand Down
42 changes: 39 additions & 3 deletions src/datasets/dataset_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ def from_csv(
Args:
path_or_paths (dict of path-like): Path(s) of the CSV file(s).
features (:class:`Features`, optional): Dataset features.
cache_dir (str, optional, default="~/datasets"): Directory to cache data.
cache_dir (str, optional, default="~/.cache/huggingface/datasets"): Directory to cache data.
keep_in_memory (bool, default=False): Whether to copy the data in-memory.
**kwargs: Keyword arguments to be passed to :meth:`pandas.read_csv`.

Expand All @@ -775,7 +775,7 @@ def from_json(
Args:
path_or_paths (path-like or list of path-like): Path(s) of the JSON Lines file(s).
features (:class:`Features`, optional): Dataset features.
cache_dir (str, optional, default="~/datasets"): Directory to cache data.
cache_dir (str, optional, default="~/.cache/huggingface/datasets"): Directory to cache data.
keep_in_memory (bool, default=False): Whether to copy the data in-memory.
**kwargs: Keyword arguments to be passed to :class:`JsonConfig`.

Expand All @@ -789,6 +789,42 @@ def from_json(
path_or_paths, features=features, cache_dir=cache_dir, keep_in_memory=keep_in_memory, **kwargs
).read()

@staticmethod
def from_parquet(
path_or_paths: Dict[str, PathLike],
features: Optional[Features] = None,
cache_dir: str = None,
keep_in_memory: bool = False,
columns: Optional[List[str]] = None,
**kwargs,
) -> "DatasetDict":
"""Create DatasetDict from Parquet file(s).

Args:
path_or_paths (dict of path-like): Path(s) of the CSV file(s).
features (:class:`Features`, optional): Dataset features.
cache_dir (str, optional, default="~/.cache/huggingface/datasets"): Directory to cache data.
keep_in_memory (bool, default=False): Whether to copy the data in-memory.
columns (:obj:`List[str]`, optional): If not None, only these columns will be read from the file.
A column name may be a prefix of a nested field, e.g. 'a' will select
'a.b', 'a.c', and 'a.d.e'.
**kwargs: Keyword arguments to be passed to :class:`ParquetConfig`.

Returns:
:class:`DatasetDict`
"""
# Dynamic import to avoid circular dependency
from .io.parquet import ParquetDatasetReader

return ParquetDatasetReader(
path_or_paths,
features=features,
cache_dir=cache_dir,
keep_in_memory=keep_in_memory,
columns=columns,
**kwargs,
).read()

@staticmethod
def from_text(
path_or_paths: Dict[str, PathLike],
Expand All @@ -802,7 +838,7 @@ def from_text(
Args:
path_or_paths (dict of path-like): Path(s) of the text file(s).
features (:class:`Features`, optional): Dataset features.
cache_dir (str, optional, default="~/datasets"): Directory to cache data.
cache_dir (str, optional, default="~/.cache/huggingface/datasets"): Directory to cache data.
keep_in_memory (bool, default=False): Whether to copy the data in-memory.
**kwargs: Keyword arguments to be passed to :class:`TextConfig`.

Expand Down
111 changes: 111 additions & 0 deletions src/datasets/io/parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import os
from typing import BinaryIO, Optional, Union

import pyarrow as pa
import pyarrow.parquet as pq
from packaging import version

from .. import Dataset, Features, NamedSplit, config
from ..formatting import query_table
from ..packaged_modules import _PACKAGED_DATASETS_MODULES
from ..packaged_modules.parquet.parquet import Parquet
from ..utils.typing import NestedDataStructureLike, PathLike
from .abc import AbstractDatasetReader


class ParquetDatasetReader(AbstractDatasetReader):
def __init__(
self,
path_or_paths: NestedDataStructureLike[PathLike],
split: Optional[NamedSplit] = None,
features: Optional[Features] = None,
cache_dir: str = None,
keep_in_memory: bool = False,
**kwargs,
):
if version.parse(pa.__version__) < version.parse("3.0.0"):
raise ImportError(
"PyArrow >= 3.0.0 is required to used the ParquetDatasetReader: pip install --upgrade pyarrow"
)
super().__init__(
path_or_paths, split=split, features=features, cache_dir=cache_dir, keep_in_memory=keep_in_memory, **kwargs
)
path_or_paths = path_or_paths if isinstance(path_or_paths, dict) else {self.split: path_or_paths}
hash = _PACKAGED_DATASETS_MODULES["parquet"][1]
self.builder = Parquet(
cache_dir=cache_dir,
data_files=path_or_paths,
features=features,
hash=hash,
**kwargs,
)

def read(self):
download_config = None
download_mode = None
ignore_verifications = False
use_auth_token = None
base_path = None

self.builder.download_and_prepare(
download_config=download_config,
download_mode=download_mode,
ignore_verifications=ignore_verifications,
# try_from_hf_gcs=try_from_hf_gcs,
base_path=base_path,
use_auth_token=use_auth_token,
)

# Build dataset for splits
dataset = self.builder.as_dataset(
split=self.split, ignore_verifications=ignore_verifications, in_memory=self.keep_in_memory
)
return dataset


class ParquetDatasetWriter:
def __init__(
self,
dataset: Dataset,
path_or_buf: Union[PathLike, BinaryIO],
batch_size: Optional[int] = None,
**parquet_writer_kwargs,
):
if version.parse(pa.__version__) < version.parse("3.0.0"):
raise ImportError(
"PyArrow >= 3.0.0 is required to used the ParquetDatasetWriter: pip install --upgrade pyarrow"
)
self.dataset = dataset
self.path_or_buf = path_or_buf
self.batch_size = batch_size
self.parquet_writer_kwargs = parquet_writer_kwargs

def write(self) -> int:
batch_size = self.batch_size if self.batch_size else config.DEFAULT_MAX_BATCH_SIZE

if isinstance(self.path_or_buf, (str, bytes, os.PathLike)):
with open(self.path_or_buf, "wb+") as buffer:
written = self._write(file_obj=buffer, batch_size=batch_size, **self.parquet_writer_kwargs)
else:
written = self._write(file_obj=self.path_or_buf, batch_size=batch_size, **self.parquet_writer_kwargs)
return written

def _write(self, file_obj: BinaryIO, batch_size: int, **parquet_writer_kwargs) -> int:
"""Writes the pyarrow table as Parquet to a binary file handle.

Caller is responsible for opening and closing the handle.
"""
written = 0
_ = parquet_writer_kwargs.pop("path_or_buf", None)
schema = pa.schema(self.dataset.features.type)
writer = pq.ParquetWriter(file_obj, schema=schema, **parquet_writer_kwargs)

for offset in range(0, len(self.dataset), batch_size):
batch = query_table(
table=self.dataset._data,
key=slice(offset, offset + batch_size),
indices=self.dataset._indices if self.dataset._indices is not None else None,
)
writer.write_table(batch)
written += batch.nbytes
return written
2 changes: 2 additions & 0 deletions src/datasets/packaged_modules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .csv import csv
from .json import json
from .pandas import pandas
from .parquet import parquet
from .text import text


Expand All @@ -30,5 +31,6 @@ def hash_python_lines(lines: List[str]) -> str:
"csv": (csv.__name__, hash_python_lines(inspect.getsource(csv).splitlines())),
"json": (json.__name__, hash_python_lines(inspect.getsource(json).splitlines())),
"pandas": (pandas.__name__, hash_python_lines(inspect.getsource(pandas).splitlines())),
"parquet": (parquet.__name__, hash_python_lines(inspect.getsource(parquet).splitlines())),
"text": (text.__name__, hash_python_lines(inspect.getsource(text).splitlines())),
}
Empty file.
Loading