From 1f0fc12d44598a3e2d490d2797cad4f62d29b79d Mon Sep 17 00:00:00 2001 From: Albert Villanova del Moral <8515462+albertvillanova@users.noreply.github.com> Date: Tue, 27 Apr 2021 17:29:20 +0200 Subject: [PATCH 1/2] Implement Dataset to JSON (#2248) * Test Dataset.to_json * Implement JsonDatasetWriter * Implement Dataset.to_json --- src/datasets/arrow_dataset.py | 22 ++++++++++++++++ src/datasets/io/json.py | 48 +++++++++++++++++++++++++++++++++-- tests/test_arrow_dataset.py | 10 ++++++++ 3 files changed, 78 insertions(+), 2 deletions(-) diff --git a/src/datasets/arrow_dataset.py b/src/datasets/arrow_dataset.py index cef58f560ed..0733071623b 100644 --- a/src/datasets/arrow_dataset.py +++ b/src/datasets/arrow_dataset.py @@ -2652,6 +2652,28 @@ def to_dict(self, batch_size: Optional[int] = None, batched: bool = False) -> Un for offset in range(0, len(self), batch_size) ) + def to_json( + self, + path_or_buf: Union[PathLike, BinaryIO], + batch_size: Optional[int] = None, + **to_json_kwargs, + ) -> int: + """Exports the dataset to JSON. + + Args: + path_or_buf (``PathLike`` or ``FileOrBuffer``): Either a path to a file or a BinaryIO. + batch_size (Optional ``int``): Size of the batch to load in memory and write at once. + Defaults to :obj:`datasets.config.DEFAULT_MAX_BATCH_SIZE`. + to_json_kwargs: Parameters to pass to pandas's :func:`pandas.DataFrame.to_json` + + Returns: + int: The number of characters or bytes written + """ + # Dynamic import to avoid circular dependency + from .io.json import JsonDatasetWriter + + return JsonDatasetWriter(self, path_or_buf, batch_size=batch_size, **to_json_kwargs).write() + def to_pandas( self, batch_size: Optional[int] = None, batched: bool = False ) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]: diff --git a/src/datasets/io/json.py b/src/datasets/io/json.py index fad2ca44d54..2175d17ed2b 100644 --- a/src/datasets/io/json.py +++ b/src/datasets/io/json.py @@ -1,6 +1,8 @@ -from typing import Optional +import os +from typing import BinaryIO, Optional, Union -from .. import Features, NamedSplit +from .. import Dataset, Features, NamedSplit, config +from ..formatting import query_table from ..packaged_modules.json.json import Json from ..utils.typing import NestedDataStructureLike, PathLike from .abc import AbstractDatasetReader @@ -52,3 +54,45 @@ def read(self): split=self.split, ignore_verifications=ignore_verifications, in_memory=self.keep_in_memory ) return dataset + + +class JsonDatasetWriter: + def __init__( + self, + dataset: Dataset, + path_or_buf: Union[PathLike, BinaryIO], + batch_size: Optional[int] = None, + **to_json_kwargs, + ): + self.dataset = dataset + self.path_or_buf = path_or_buf + self.batch_size = batch_size + self.to_json_kwargs = to_json_kwargs + + def write(self) -> int: + batch_size = self.batch_size if self.batch_size else config.DEFAULT_MAX_BATCH_SIZE + + if isinstance(self.path_or_buf, (str, bytes, os.PathLike)): + with open(self.path_or_buf, "wb+") as buffer: + written = self._write(file_obj=buffer, batch_size=batch_size, **self.to_json_kwargs) + else: + written = self._write(file_obj=self.path_or_buf, batch_size=batch_size, **self.to_json_kwargs) + return written + + def _write(self, file_obj: BinaryIO, batch_size: int, encoding: str = "utf-8", **to_json_kwargs) -> int: + """Writes the pyarrow table as JSON to a binary file handle. + + Caller is responsible for opening and closing the handle. + """ + written = 0 + _ = to_json_kwargs.pop("path_or_buf", None) + + for offset in range(0, len(self.dataset), batch_size): + batch = query_table( + table=self.dataset.data, + key=slice(offset, offset + batch_size), + indices=self.dataset._indices if self.dataset._indices is not None else None, + ) + json_str = batch.to_pandas().to_json(path_or_buf=None, **to_json_kwargs) + written += file_obj.write(json_str.encode(encoding)) + return written diff --git a/tests/test_arrow_dataset.py b/tests/test_arrow_dataset.py index 245136e48b7..af86c0f93ef 100644 --- a/tests/test_arrow_dataset.py +++ b/tests/test_arrow_dataset.py @@ -2106,6 +2106,16 @@ def test_dataset_from_text(path_type, split, features, keep_in_memory, text_path assert dataset.features[feature].dtype == expected_dtype +def test_dataset_to_json(dataset, tmp_path): + file_path = tmp_path / "test_path.jsonl" + bytes_written = dataset.to_json(path_or_buf=file_path) + assert file_path.is_file() + assert bytes_written == file_path.stat().st_size + df = pd.read_json(file_path) + assert df.shape == dataset.shape + assert list(df.columns) == list(dataset.column_names) + + @pytest.mark.parametrize("in_memory", [False, True]) @pytest.mark.parametrize( "method_and_params", From 80e59ef178d3bb2090d091bc32315c655eb0633d Mon Sep 17 00:00:00 2001 From: Quentin Lhoest <42851186+lhoestq@users.noreply.github.com> Date: Tue, 27 Apr 2021 18:30:48 +0200 Subject: [PATCH 2/2] Update format, fingerprint and indices after add_item (#2254) * update format, fingerprint and indices after add_item * minor * rename to item_indices_table * test dataset._indices * fix class_encode_column issue --- src/datasets/arrow_dataset.py | 21 ++++++++++++++++++--- tests/test_arrow_dataset.py | 32 +++++++++++++++++++++++--------- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/src/datasets/arrow_dataset.py b/src/datasets/arrow_dataset.py index 0733071623b..db504dc9072 100644 --- a/src/datasets/arrow_dataset.py +++ b/src/datasets/arrow_dataset.py @@ -775,6 +775,7 @@ def class_encode_column(self, column: str) -> "Dataset": class_names = sorted(dset.unique(column)) dst_feat = ClassLabel(names=class_names) dset = dset.map(lambda batch: {column: dst_feat.str2int(batch)}, input_columns=column, batched=True) + dset = concatenate_datasets([self.remove_columns([column]), dset], axis=1) new_features = copy.deepcopy(dset.features) new_features[column] = dst_feat @@ -2899,10 +2900,12 @@ def add_elasticsearch_index( ) return self - def add_item(self, item: dict): + @transmit_format + @fingerprint_transform(inplace=False) + def add_item(self, item: dict, new_fingerprint: str): """Add item to Dataset. - .. versionadded:: 1.6 + .. versionadded:: 1.7 Args: item (dict): Item data to be added. @@ -2916,7 +2919,19 @@ def add_item(self, item: dict): item_table = item_table.cast(schema) # Concatenate tables table = concat_tables([self._data, item_table]) - return Dataset(table) + if self._indices is None: + indices_table = None + else: + item_indices_array = pa.array([len(self._data)], type=pa.uint64()) + item_indices_table = InMemoryTable.from_arrays([item_indices_array], names=["indices"]) + indices_table = concat_tables([self._indices, item_indices_table]) + return Dataset( + table, + info=copy.deepcopy(self.info), + split=self.split, + indices_table=indices_table, + fingerprint=new_fingerprint, + ) def concatenate_datasets( diff --git a/tests/test_arrow_dataset.py b/tests/test_arrow_dataset.py index af86c0f93ef..8c164901348 100644 --- a/tests/test_arrow_dataset.py +++ b/tests/test_arrow_dataset.py @@ -1949,6 +1949,10 @@ def test_concatenate_datasets_duplicate_columns(dataset): assert "duplicated" in str(excinfo.value) +@pytest.mark.parametrize( + "transform", + [None, ("shuffle", (42,), {}), ("with_format", ("pandas",), {}), ("class_encode_column", ("col_2",), {})], +) @pytest.mark.parametrize("in_memory", [False, True]) @pytest.mark.parametrize( "item", @@ -1959,22 +1963,32 @@ def test_concatenate_datasets_duplicate_columns(dataset): {"col_1": 4.0, "col_2": 4.0, "col_3": 4.0}, ], ) -def test_dataset_add_item(item, in_memory, dataset_dict, arrow_path): - dataset = ( +def test_dataset_add_item(item, in_memory, dataset_dict, arrow_path, transform): + dataset_to_test = ( Dataset(InMemoryTable.from_pydict(dataset_dict)) if in_memory else Dataset(MemoryMappedTable.from_file(arrow_path)) ) - dataset = dataset.add_item(item) + if transform is not None: + transform_name, args, kwargs = transform + dataset_to_test: Dataset = getattr(dataset_to_test, transform_name)(*args, **kwargs) + dataset = dataset_to_test.add_item(item) assert dataset.data.shape == (5, 3) - expected_features = {"col_1": "string", "col_2": "int64", "col_3": "float64"} - assert dataset.data.column_names == list(expected_features.keys()) + expected_features = dataset_to_test.features + assert sorted(dataset.data.column_names) == sorted(expected_features.keys()) for feature, expected_dtype in expected_features.items(): - assert dataset.features[feature].dtype == expected_dtype - assert len(dataset.data.blocks) == 1 if in_memory else 2 # multiple InMemoryTables are consolidated as one - dataset = dataset.add_item(item) - assert dataset.data.shape == (6, 3) + assert dataset.features[feature] == expected_dtype assert len(dataset.data.blocks) == 1 if in_memory else 2 # multiple InMemoryTables are consolidated as one + assert dataset.format["type"] == dataset_to_test.format["type"] + assert dataset._fingerprint != dataset_to_test._fingerprint + dataset.reset_format() + dataset_to_test.reset_format() + assert dataset[:-1] == dataset_to_test[:] + assert {k: int(v) for k, v in dataset[-1].items()} == {k: int(v) for k, v in item.items()} + if dataset._indices is not None: + dataset_indices = dataset._indices["indices"].to_pylist() + dataset_to_test_indices = dataset_to_test._indices["indices"].to_pylist() + assert dataset_indices == dataset_to_test_indices + [len(dataset_to_test._data)] @pytest.mark.parametrize("keep_in_memory", [False, True])