From daaa0de61e2b05d0f7faefda4910714bc654a14d Mon Sep 17 00:00:00 2001 From: mariosasko Date: Mon, 8 Nov 2021 02:40:57 +0100 Subject: [PATCH 01/10] Add option to disable type optimization --- src/datasets/arrow_writer.py | 14 +++++++++----- src/datasets/config.py | 5 +++++ 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/datasets/arrow_writer.py b/src/datasets/arrow_writer.py index 79d1926cc97..c80702a90e6 100644 --- a/src/datasets/arrow_writer.py +++ b/src/datasets/arrow_writer.py @@ -82,12 +82,13 @@ class TypedSequence: """ - def __init__(self, data, type=None, try_type=None, optimized_int_type=None): + def __init__(self, data, type=None, try_type=None, optimized_int_type=None, col=None): assert type is None or try_type is None, "You cannot specify both type and try_type" self.data = data self.type = type self.try_type = try_type # is ignored if it doesn't match the data self.optimized_int_type = optimized_int_type + self.col = col # ignore, here for consistency with `OptimizedTypedSequence` def __arrow_array__(self, type=None): """This function is called when calling pa.array(typed_sequence)""" @@ -176,7 +177,7 @@ def __init__(self, data, type=None, try_type=None, col=None, optimized_int_type= } if type is None and try_type is None: optimized_int_type = optimized_int_type_by_col.get(col, None) - super().__init__(data, type=type, try_type=try_type, optimized_int_type=optimized_int_type) + super().__init__(data, type=type, try_type=try_type, optimized_int_type=optimized_int_type, col=col) class ArrowWriter: @@ -241,6 +242,7 @@ def __init__( self.current_rows: List[pa.Table] = [] self.pa_writer: Optional[pa.RecordBatchStreamWriter] = None self.hkey_record = [] + self.typed_sequence_cls = OptimizedTypedSequence if config.OPTIMIZE_PYARROW_TYPES else TypedSequence def __len__(self): """Return the number of writed and staged examples""" @@ -319,12 +321,12 @@ def write_examples_on_file(self): for col in cols: col_type = schema.field(col).type if schema is not None else None col_try_type = try_schema.field(col).type if try_schema is not None and col in try_schema.names else None - typed_sequence = OptimizedTypedSequence( + typed_sequence = self.typed_sequence_cls( [row[0][col] for row in self.current_examples], type=col_type, try_type=col_try_type, col=col ) pa_array = pa.array(typed_sequence) inferred_type = pa_array.type - first_example = pa.array(OptimizedTypedSequence(typed_sequence.data[:1], type=inferred_type))[0] + first_example = pa.array(self.typed_sequence_cls(typed_sequence.data[:1], type=inferred_type))[0] if pa_array[0] != first_example: # Sanity check (check for overflow in StructArray or ListArray) # This check fails with FloatArrays with nans, which is not what we want, so account for that: if not isinstance(pa_array[0], pa.lib.FloatScalar): @@ -422,7 +424,9 @@ def write_batch( for col in sorted(batch_examples.keys()): col_type = schema.field(col).type if schema is not None else None col_try_type = try_schema.field(col).type if try_schema is not None and col in try_schema.names else None - typed_sequence = OptimizedTypedSequence(batch_examples[col], type=col_type, try_type=col_try_type, col=col) + typed_sequence = self.typed_sequence_cls( + batch_examples[col], type=col_type, try_type=col_try_type, col=col + ) typed_sequence_examples[col] = typed_sequence pa_table = pa.Table.from_pydict(typed_sequence_examples) self.write_table(pa_table, writer_batch_size) diff --git a/src/datasets/config.py b/src/datasets/config.py index 3815e0ef583..98c1c98fd3a 100644 --- a/src/datasets/config.py +++ b/src/datasets/config.py @@ -165,6 +165,11 @@ # For big tables, we write them on disk instead MAX_TABLE_NBYTES_FOR_PICKLING = 4 << 30 +# Whether to optimize PyArrow types in order to reduce the size of the generated cache files +OPTIMIZE_PYARROW_TYPES = ( + os.environ.get("HF_DATASETS_OPTIMIZE_PYARROW_TYPES", "AUTO").upper() in ENV_VARS_TRUE_AND_AUTO_VALUES +) + # Offline mode HF_DATASETS_OFFLINE = os.environ.get("HF_DATASETS_OFFLINE", "AUTO").upper() in ENV_VARS_TRUE_VALUES From e8f6fae9aceba0168d59ce54b18c3924bc7113d7 Mon Sep 17 00:00:00 2001 From: mariosasko Date: Mon, 8 Nov 2021 15:30:13 +0100 Subject: [PATCH 02/10] Add a test --- tests/test_arrow_writer.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/test_arrow_writer.py b/tests/test_arrow_writer.py index 4e66990cb46..bda04a41924 100644 --- a/tests/test_arrow_writer.py +++ b/tests/test_arrow_writer.py @@ -234,6 +234,18 @@ def test_optimized_typed_sequence(sequence, col, expected_dtype): assert get_base_dtype(arr.type) == expected_dtype +def test_arrow_writer_typed_sequence_cls(monkeypatch): + stream = pa.BufferOutputStream() + + with ArrowWriter(stream=stream) as writer: + assert writer.typed_sequence_cls == OptimizedTypedSequence + + monkeypatch.setattr(config, "OPTIMIZE_PYARROW_TYPES", False) + + with ArrowWriter(stream=stream) as writer: + assert writer.typed_sequence_cls == TypedSequence + + @pytest.mark.parametrize("raise_exception", [False, True]) def test_arrow_writer_closes_stream(raise_exception, tmp_path): path = str(tmp_path / "dataset-train.arrow") From fd3158dae0a23836a4a30977af8c5cdb0337752e Mon Sep 17 00:00:00 2001 From: mariosasko Date: Mon, 8 Nov 2021 17:05:33 +0100 Subject: [PATCH 03/10] Add DISABLE prefix --- src/datasets/arrow_writer.py | 2 +- src/datasets/config.py | 6 +++--- tests/test_arrow_writer.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/datasets/arrow_writer.py b/src/datasets/arrow_writer.py index c80702a90e6..e334921ee53 100644 --- a/src/datasets/arrow_writer.py +++ b/src/datasets/arrow_writer.py @@ -242,7 +242,7 @@ def __init__( self.current_rows: List[pa.Table] = [] self.pa_writer: Optional[pa.RecordBatchStreamWriter] = None self.hkey_record = [] - self.typed_sequence_cls = OptimizedTypedSequence if config.OPTIMIZE_PYARROW_TYPES else TypedSequence + self.typed_sequence_cls = TypedSequence if config.DISABLE_PYARROW_TYPES_OPTIMIZATION else OptimizedTypedSequence def __len__(self): """Return the number of writed and staged examples""" diff --git a/src/datasets/config.py b/src/datasets/config.py index 98c1c98fd3a..334f7eab32d 100644 --- a/src/datasets/config.py +++ b/src/datasets/config.py @@ -165,9 +165,9 @@ # For big tables, we write them on disk instead MAX_TABLE_NBYTES_FOR_PICKLING = 4 << 30 -# Whether to optimize PyArrow types in order to reduce the size of the generated cache files -OPTIMIZE_PYARROW_TYPES = ( - os.environ.get("HF_DATASETS_OPTIMIZE_PYARROW_TYPES", "AUTO").upper() in ENV_VARS_TRUE_AND_AUTO_VALUES +# Control PyArrow types optimization - used to reduce the size of the generated cache files +DISABLE_PYARROW_TYPES_OPTIMIZATION = ( + os.environ.get("HF_DATASETS_DISABLE_PYARROW_TYPES_OPTIMIZATION", "AUTO").upper() in ENV_VARS_TRUE_VALUES ) # Offline mode diff --git a/tests/test_arrow_writer.py b/tests/test_arrow_writer.py index bda04a41924..a87e59648fd 100644 --- a/tests/test_arrow_writer.py +++ b/tests/test_arrow_writer.py @@ -240,7 +240,7 @@ def test_arrow_writer_typed_sequence_cls(monkeypatch): with ArrowWriter(stream=stream) as writer: assert writer.typed_sequence_cls == OptimizedTypedSequence - monkeypatch.setattr(config, "OPTIMIZE_PYARROW_TYPES", False) + monkeypatch.setattr(config, "DISABLE_PYARROW_TYPES_OPTIMIZATION", True) with ArrowWriter(stream=stream) as writer: assert writer.typed_sequence_cls == TypedSequence From b2e1cc550c05ddff80c916b44efbb1b57b16f5e7 Mon Sep 17 00:00:00 2001 From: mariosasko Date: Mon, 8 Nov 2021 17:10:51 +0100 Subject: [PATCH 04/10] Style --- src/datasets/arrow_writer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/datasets/arrow_writer.py b/src/datasets/arrow_writer.py index e334921ee53..a32ca86f428 100644 --- a/src/datasets/arrow_writer.py +++ b/src/datasets/arrow_writer.py @@ -242,7 +242,9 @@ def __init__( self.current_rows: List[pa.Table] = [] self.pa_writer: Optional[pa.RecordBatchStreamWriter] = None self.hkey_record = [] - self.typed_sequence_cls = TypedSequence if config.DISABLE_PYARROW_TYPES_OPTIMIZATION else OptimizedTypedSequence + self.typed_sequence_cls = ( + TypedSequence if config.DISABLE_PYARROW_TYPES_OPTIMIZATION else OptimizedTypedSequence + ) def __len__(self): """Return the number of writed and staged examples""" From 65e80cb7b0f65f45a3d05b8f6b3820d0df276205 Mon Sep 17 00:00:00 2001 From: mariosasko Date: Mon, 8 Nov 2021 18:05:40 +0100 Subject: [PATCH 05/10] Revert changes --- src/datasets/arrow_writer.py | 11 +++-------- src/datasets/config.py | 5 ----- tests/test_arrow_writer.py | 12 ------------ 3 files changed, 3 insertions(+), 25 deletions(-) diff --git a/src/datasets/arrow_writer.py b/src/datasets/arrow_writer.py index a32ca86f428..8d8fb7a574d 100644 --- a/src/datasets/arrow_writer.py +++ b/src/datasets/arrow_writer.py @@ -242,9 +242,6 @@ def __init__( self.current_rows: List[pa.Table] = [] self.pa_writer: Optional[pa.RecordBatchStreamWriter] = None self.hkey_record = [] - self.typed_sequence_cls = ( - TypedSequence if config.DISABLE_PYARROW_TYPES_OPTIMIZATION else OptimizedTypedSequence - ) def __len__(self): """Return the number of writed and staged examples""" @@ -323,12 +320,12 @@ def write_examples_on_file(self): for col in cols: col_type = schema.field(col).type if schema is not None else None col_try_type = try_schema.field(col).type if try_schema is not None and col in try_schema.names else None - typed_sequence = self.typed_sequence_cls( + typed_sequence = OptimizedTypedSequence( [row[0][col] for row in self.current_examples], type=col_type, try_type=col_try_type, col=col ) pa_array = pa.array(typed_sequence) inferred_type = pa_array.type - first_example = pa.array(self.typed_sequence_cls(typed_sequence.data[:1], type=inferred_type))[0] + first_example = pa.array(OptimizedTypedSequence(typed_sequence.data[:1], type=inferred_type))[0] if pa_array[0] != first_example: # Sanity check (check for overflow in StructArray or ListArray) # This check fails with FloatArrays with nans, which is not what we want, so account for that: if not isinstance(pa_array[0], pa.lib.FloatScalar): @@ -426,9 +423,7 @@ def write_batch( for col in sorted(batch_examples.keys()): col_type = schema.field(col).type if schema is not None else None col_try_type = try_schema.field(col).type if try_schema is not None and col in try_schema.names else None - typed_sequence = self.typed_sequence_cls( - batch_examples[col], type=col_type, try_type=col_try_type, col=col - ) + typed_sequence = OptimizedTypedSequence(batch_examples[col], type=col_type, try_type=col_try_type, col=col) typed_sequence_examples[col] = typed_sequence pa_table = pa.Table.from_pydict(typed_sequence_examples) self.write_table(pa_table, writer_batch_size) diff --git a/src/datasets/config.py b/src/datasets/config.py index 334f7eab32d..3815e0ef583 100644 --- a/src/datasets/config.py +++ b/src/datasets/config.py @@ -165,11 +165,6 @@ # For big tables, we write them on disk instead MAX_TABLE_NBYTES_FOR_PICKLING = 4 << 30 -# Control PyArrow types optimization - used to reduce the size of the generated cache files -DISABLE_PYARROW_TYPES_OPTIMIZATION = ( - os.environ.get("HF_DATASETS_DISABLE_PYARROW_TYPES_OPTIMIZATION", "AUTO").upper() in ENV_VARS_TRUE_VALUES -) - # Offline mode HF_DATASETS_OFFLINE = os.environ.get("HF_DATASETS_OFFLINE", "AUTO").upper() in ENV_VARS_TRUE_VALUES diff --git a/tests/test_arrow_writer.py b/tests/test_arrow_writer.py index a87e59648fd..4e66990cb46 100644 --- a/tests/test_arrow_writer.py +++ b/tests/test_arrow_writer.py @@ -234,18 +234,6 @@ def test_optimized_typed_sequence(sequence, col, expected_dtype): assert get_base_dtype(arr.type) == expected_dtype -def test_arrow_writer_typed_sequence_cls(monkeypatch): - stream = pa.BufferOutputStream() - - with ArrowWriter(stream=stream) as writer: - assert writer.typed_sequence_cls == OptimizedTypedSequence - - monkeypatch.setattr(config, "DISABLE_PYARROW_TYPES_OPTIMIZATION", True) - - with ArrowWriter(stream=stream) as writer: - assert writer.typed_sequence_cls == TypedSequence - - @pytest.mark.parametrize("raise_exception", [False, True]) def test_arrow_writer_closes_stream(raise_exception, tmp_path): path = str(tmp_path / "dataset-train.arrow") From 9df83112a0c2e44c0db635273bd7b2312f7fa4c1 Mon Sep 17 00:00:00 2001 From: mariosasko Date: Mon, 8 Nov 2021 18:06:56 +0100 Subject: [PATCH 06/10] Remove col in TypedSequence --- src/datasets/arrow_writer.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/datasets/arrow_writer.py b/src/datasets/arrow_writer.py index 8d8fb7a574d..79d1926cc97 100644 --- a/src/datasets/arrow_writer.py +++ b/src/datasets/arrow_writer.py @@ -82,13 +82,12 @@ class TypedSequence: """ - def __init__(self, data, type=None, try_type=None, optimized_int_type=None, col=None): + def __init__(self, data, type=None, try_type=None, optimized_int_type=None): assert type is None or try_type is None, "You cannot specify both type and try_type" self.data = data self.type = type self.try_type = try_type # is ignored if it doesn't match the data self.optimized_int_type = optimized_int_type - self.col = col # ignore, here for consistency with `OptimizedTypedSequence` def __arrow_array__(self, type=None): """This function is called when calling pa.array(typed_sequence)""" @@ -177,7 +176,7 @@ def __init__(self, data, type=None, try_type=None, col=None, optimized_int_type= } if type is None and try_type is None: optimized_int_type = optimized_int_type_by_col.get(col, None) - super().__init__(data, type=type, try_type=try_type, optimized_int_type=optimized_int_type, col=col) + super().__init__(data, type=type, try_type=try_type, optimized_int_type=optimized_int_type) class ArrowWriter: From f31593bf404084eaef1449c2150d27b2169aabcf Mon Sep 17 00:00:00 2001 From: mariosasko Date: Mon, 8 Nov 2021 18:39:23 +0100 Subject: [PATCH 07/10] Add fallback in case of range error --- src/datasets/arrow_writer.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/datasets/arrow_writer.py b/src/datasets/arrow_writer.py index 79d1926cc97..c6de094beb1 100644 --- a/src/datasets/arrow_writer.py +++ b/src/datasets/arrow_writer.py @@ -130,6 +130,7 @@ def __arrow_array__(self, type=None): "Specified try_type alters data. Please check that the type/feature that you provided match the type/features of the data." ) if self.optimized_int_type and self.type is None and self.try_type is None: + trying_int_optimization = True if pa.types.is_int64(out.type): out = out.cast(self.optimized_int_type) elif pa.types.is_list(out.type): @@ -154,6 +155,8 @@ def __arrow_array__(self, type=None): type_(self.data), e ) ) from None + elif trying_int_optimization and "not in range" in str(e): + return out else: raise elif "overflow" in str(e): @@ -162,6 +165,8 @@ def __arrow_array__(self, type=None): type_(self.data), e ) ) from None + elif trying_int_optimization and "not in range" in str(e): + return out else: raise From a5c3d8c500c0034646cd3699545c5c5b660ad75d Mon Sep 17 00:00:00 2001 From: mariosasko Date: Mon, 8 Nov 2021 18:39:34 +0100 Subject: [PATCH 08/10] Add test --- tests/test_arrow_writer.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/test_arrow_writer.py b/tests/test_arrow_writer.py index 4e66990cb46..198ed6bb4ab 100644 --- a/tests/test_arrow_writer.py +++ b/tests/test_arrow_writer.py @@ -1,7 +1,9 @@ +import copy import os import tempfile from unittest import TestCase +import numpy as np import pyarrow as pa import pytest @@ -211,6 +213,13 @@ def get_base_dtype(arr_type): return arr_type +def change_first_primitive_element_in_list(lst, value): + if isinstance(lst[0], list): + change_first_primitive_element_in_list(lst[0], value) + else: + lst[0] = value + + @pytest.mark.parametrize("optimized_int_type, expected_dtype", [(None, pa.int64()), (pa.int32(), pa.int32())]) @pytest.mark.parametrize("sequence", [[1, 2, 3], [[1, 2, 3]], [[[1, 2, 3]]]]) def test_optimized_int_type_for_typed_sequence(sequence, optimized_int_type, expected_dtype): @@ -230,9 +239,19 @@ def test_optimized_int_type_for_typed_sequence(sequence, optimized_int_type, exp ) @pytest.mark.parametrize("sequence", [[1, 2, 3], [[1, 2, 3]], [[[1, 2, 3]]]]) def test_optimized_typed_sequence(sequence, col, expected_dtype): + # in range arr = pa.array(OptimizedTypedSequence(sequence, col=col)) assert get_base_dtype(arr.type) == expected_dtype + # not in range + if col != "other": + # avoids errors due to in-place modifications + sequence = copy.deepcopy(sequence) + value = np.iinfo(expected_dtype.to_pandas_dtype()).max + 1 + change_first_primitive_element_in_list(sequence, value) + arr = pa.array(OptimizedTypedSequence(sequence, col=col)) + assert get_base_dtype(arr.type) == pa.int64() + @pytest.mark.parametrize("raise_exception", [False, True]) def test_arrow_writer_closes_stream(raise_exception, tmp_path): From 81e6bbe123d55146dbfce6d58c79209dcd9d1270 Mon Sep 17 00:00:00 2001 From: mariosasko Date: Mon, 8 Nov 2021 18:54:48 +0100 Subject: [PATCH 09/10] Fix --- src/datasets/arrow_writer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/datasets/arrow_writer.py b/src/datasets/arrow_writer.py index c6de094beb1..7e531e393f6 100644 --- a/src/datasets/arrow_writer.py +++ b/src/datasets/arrow_writer.py @@ -100,6 +100,7 @@ def __arrow_array__(self, type=None): trying_type = True else: type = self.type + trying_int_optimization = False try: if isinstance(type, _ArrayXDExtensionType): if isinstance(self.data, np.ndarray): From f853475cee08f1a8644948adfbb135b651b79a71 Mon Sep 17 00:00:00 2001 From: mariosasko Date: Tue, 9 Nov 2021 11:37:55 +0100 Subject: [PATCH 10/10] Log info message --- src/datasets/arrow_writer.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/datasets/arrow_writer.py b/src/datasets/arrow_writer.py index 7e531e393f6..4d2783963cc 100644 --- a/src/datasets/arrow_writer.py +++ b/src/datasets/arrow_writer.py @@ -157,6 +157,8 @@ def __arrow_array__(self, type=None): ) ) from None elif trying_int_optimization and "not in range" in str(e): + optimized_int_type_str = np.dtype(self.optimized_int_type.to_pandas_dtype()).name + logger.info(f"Failed to cast a sequence to {optimized_int_type_str}. Falling back to int64.") return out else: raise @@ -167,6 +169,8 @@ def __arrow_array__(self, type=None): ) ) from None elif trying_int_optimization and "not in range" in str(e): + optimized_int_type_str = np.dtype(self.optimized_int_type.to_pandas_dtype()).name + logger.info(f"Failed to cast a sequence to {optimized_int_type_str}. Falling back to int64.") return out else: raise