Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Dataset from Parquet #2247

Closed
21 changes: 16 additions & 5 deletions src/datasets/arrow_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,6 @@

logger = get_logger(__name__)

if int(pa.__version__.split(".")[0]) == 0:
PYARROW_V0 = True
else:
PYARROW_V0 = False


class DatasetInfoMixin:
"""This base class exposes some attributes of DatasetInfo
Expand Down Expand Up @@ -323,6 +318,22 @@ def from_file(
indices_table=indices_pa_table,
)

@classmethod
def from_parquet(cls, filename: str, in_memory: bool = True):
"""Create Dataset from Parquet file.

Args:
filename (:obj:`str`): Path of the Parquet file.
in_memory (:obj:`bool`, default ``True``): Whether to copy the data in-memory.

Returns:
:class:`Dataset`
"""
from .arrow_reader import ParquetReader
Copy link
Member

@lhoestq lhoestq Apr 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for diving into this !

The ParquetReader unfortunately always bring data in memory. The memory-map option is here only for reading performance. Under the hood the data from parquet are compressed so you can't just memory map the file since there's no on-the-fly decoding afaik.

I think the best option would be to implement a ArrowBasedBuilder as we did for csv/json/etc.


dataset_kwargs = ParquetReader("", None).read_files([{"filename": filename}], in_memory=in_memory)
return cls(dataset_kwargs["arrow_table"])

@classmethod
def from_buffer(
cls,
Expand Down
9 changes: 5 additions & 4 deletions src/datasets/arrow_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,19 +343,20 @@ def __init__(self, path: str, info: Optional["DatasetInfo"]):
super().__init__(path, info)
self._filetype_suffix = "parquet"

def _get_table_from_filename(self, filename_skip_take, **kwargs):
def _get_table_from_filename(self, filename_skip_take, in_memory=False):
"""Returns a Dataset instance from given (filename, skip, take)."""
filename, skip, take = (
filename_skip_take["filename"],
filename_skip_take["skip"] if "skip" in filename_skip_take else None,
filename_skip_take["take"] if "take" in filename_skip_take else None,
)
# Parquet read_table always loads data in memory, independently of memory_map
pa_table = pq.read_table(filename, memory_map=True)
# TODO: Parquet read_table always loads data in memory, independently of memory_map
pa_table = pq.read_table(filename, memory_map=not in_memory)
# here we don't want to slice an empty table, or it may segfault
if skip is not None and take is not None and not (skip == 0 and take == len(pa_table)):
pa_table = pa_table.slice(skip, take)
return pa_table
table_cls = InMemoryTable # if in_memory else MemoryMappedTable
return table_cls(pa_table)


@dataclass(frozen=True)
Expand Down
3 changes: 3 additions & 0 deletions src/datasets/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
from pathlib import Path

import pyarrow as pa
from packaging import version

from .utils.logging import get_logger
Expand Down Expand Up @@ -31,6 +32,8 @@
else:
import importlib.metadata as importlib_metadata

PYARROW_VERSION: str = pa.__version__

# General environment variables accepted values for booleans
ENV_VARS_TRUE_VALUES = {"1", "ON", "YES", "TRUE"}
ENV_VARS_TRUE_AND_AUTO_VALUES = ENV_VARS_TRUE_VALUES.union({"AUTO"})
Expand Down
3 changes: 1 addition & 2 deletions src/datasets/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import numpy as np
import posixpath
import pyarrow as pa
import requests
from tqdm.auto import tqdm

Expand Down Expand Up @@ -365,7 +364,7 @@ def cached_path(

def get_datasets_user_agent(user_agent: Optional[Union[str, dict]] = None) -> str:
ua = "datasets/{}; python/{}".format(__version__, config.PY_VERSION)
ua += "; pyarrow/{}".format(pa.__version__)
ua += "; pyarrow/{}".format(config.PYARROW_VERSION)
if config.TORCH_AVAILABLE:
ua += "; torch/{}".format(config.TORCH_VERSION)
if config.TF_AVAILABLE:
Expand Down
9 changes: 9 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import lzma
import textwrap

import pyarrow.parquet as pq
import pytest

from datasets.arrow_dataset import Dataset
Expand Down Expand Up @@ -63,6 +64,14 @@ def arrow_file(tmp_path_factory, dataset):
return filename


@pytest.fixture(scope="session")
def parquet_file(tmp_path_factory, dataset):
filename = str(tmp_path_factory.mktemp("data") / "file.parquet")
dataset.map(cache_file_name=filename)
pq.write_table(dataset.data.table, filename)
return filename


@pytest.fixture(scope="session")
def text_file(tmp_path_factory):
filename = tmp_path_factory.mktemp("data") / "file.txt"
Expand Down
23 changes: 22 additions & 1 deletion tests/test_arrow_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from absl.testing import parameterized

import datasets.arrow_dataset
from datasets import NamedSplit, concatenate_datasets, load_from_disk, temp_seed
from datasets import NamedSplit, concatenate_datasets, config, load_from_disk, temp_seed
from datasets.arrow_dataset import Dataset, transmit_format, update_metadata_with_features
from datasets.dataset_dict import DatasetDict
from datasets.features import Array2D, Array3D, ClassLabel, Features, Sequence, Value
Expand Down Expand Up @@ -1995,6 +1995,27 @@ def test_dataset_from_file(in_memory, dataset, arrow_file):
assert dataset_from_file.cache_files == ([{"filename": filename}] if not in_memory else [])


@pytest.mark.skipif(
config.PYARROW_VERSION < "2", reason="pyarrow.lib.ArrowInvalid: Mix of struct and list types not yet supported"
)
@pytest.mark.parametrize("in_memory", [True]) # TODO: False, once memory_map properly implemented for pq.read_table
def test_dataset_from_parquet(in_memory, dataset, parquet_file):
filename = parquet_file
with assert_arrow_memory_increases() if in_memory else assert_arrow_memory_doesnt_increase():
dataset_from_parquet = Dataset.from_parquet(filename, in_memory=in_memory)
assert dataset_from_parquet.features.type == dataset.features.type
# assert dataset_from_parquet.features == dataset.features
# TODO: "labels": Value vs. ClassLabel
# Sequence(feature=Value(dtype='int64', id=None), length=-1, id=None)
# Sequence(feature=ClassLabel(num_classes=2, names=['negative', 'positive'], names_file=None, id=None), length=-1, id=None)
# TODO: "answers":
# {'answer_start': Sequence(feature=Value(dtype='int32', id=None), length=-1, id=None), 'text': Sequence(feature=Value(dtype='string', id=None), length=-1, id=None)}
# Sequence(feature={'text': Value(dtype='string', id=None), 'answer_start': Value(dtype='int32', id=None)}, length=-1, id=None)
for feature_name in ["tokens", "id"]:
assert dataset_from_parquet.features[feature_name] == dataset.features[feature_name]
assert dataset_from_parquet.cache_files == ([{"filename": filename}] if not in_memory else [])


@pytest.mark.parametrize("keep_in_memory", [False, True])
@pytest.mark.parametrize(
"features",
Expand Down
23 changes: 22 additions & 1 deletion tests/test_arrow_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import pyarrow as pa
import pytest

from datasets import config
from datasets.arrow_dataset import Dataset
from datasets.arrow_reader import ArrowReader, BaseReader, ReadInstruction
from datasets.arrow_reader import ArrowReader, BaseReader, ParquetReader, ReadInstruction
from datasets.info import DatasetInfo
from datasets.splits import NamedSplit, Split, SplitDict, SplitInfo

Expand Down Expand Up @@ -115,6 +116,26 @@ def test_read_files(in_memory, dataset, arrow_file):
assert dict(table.to_pydict()) == dict(dataset.data.to_pydict()) # to_pydict returns OrderedDict


class TestParquetReader:
@pytest.mark.skipif(
config.PYARROW_VERSION < "2", reason="pyarrow.lib.ArrowInvalid: Mix of struct and list types not yet supported"
)
@pytest.mark.parametrize("in_memory", [True])
def test_read_files(self, in_memory, dataset, parquet_file):
filename = parquet_file
reader = ParquetReader("", None)
# TODO: Parquet read_table always loads data in memory, independently of memory_map
with assert_arrow_memory_increases() if in_memory else assert_arrow_memory_doesnt_increase():
dataset_kwargs = reader.read_files([{"filename": filename}], in_memory=in_memory)
assert dataset_kwargs.keys() == set(["arrow_table", "info", "split"])
table = dataset_kwargs["arrow_table"]
assert table.shape == dataset.data.shape
assert set(table.column_names) == set(dataset.data.column_names)
assert dict(table.to_pydict()) == dict(dataset.data.to_pydict()) # to_pydict returns OrderedDict
assert dataset_kwargs["info"] is None
assert dataset_kwargs["split"] is None


def test_read_instruction_spec():
assert ReadInstruction("train", to=10, unit="abs").to_spec() == "train[:10]"
assert ReadInstruction("train", from_=-80, to=10, unit="%").to_spec() == "train[-80%:10%]"
Expand Down
3 changes: 2 additions & 1 deletion tests/test_arrow_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest
from packaging import version

from datasets import config
from datasets.arrow_writer import ArrowWriter, OptimizedTypedSequence, TypedSequence
from datasets.features import Array2DExtensionType

Expand Down Expand Up @@ -56,7 +57,7 @@ def test_try_incompatible_extension_type(self):
self.assertEqual(arr.type, pa.string())

def test_catch_overflow(self):
if version.parse(pa.__version__) < version.parse("2.0.0"):
if version.parse(config.PYARROW_VERSION) < version.parse("2.0.0"):
with self.assertRaises(OverflowError):
_ = pa.array(TypedSequence([["x" * 1024]] * ((2 << 20) + 1))) # ListArray with a bit more than 2GB

Expand Down