From 244ff9a9c85d02cbaee6a6b70fb32fdd9cd1ac6e Mon Sep 17 00:00:00 2001 From: Lonnie Liu <95255098+aslonnie@users.noreply.github.com> Date: Fri, 22 Nov 2024 02:35:40 -0800 Subject: [PATCH] Revert "[Data] Fix pandas memory calculation." (#48866) Reverts ray-project/ray#46939 for https://github.com/ray-project/ray/issues/48865 https://github.com/ray-project/ray/issues/48864 https://github.com/ray-project/ray/issues/48863 https://github.com/ray-project/ray/issues/48862 --- python/ray/data/_internal/pandas_block.py | 70 +------ python/ray/data/tests/test_pandas_block.py | 229 +-------------------- 2 files changed, 3 insertions(+), 296 deletions(-) diff --git a/python/ray/data/_internal/pandas_block.py b/python/ray/data/_internal/pandas_block.py index ff1686c1b355..119469b46c1b 100644 --- a/python/ray/data/_internal/pandas_block.py +++ b/python/ray/data/_internal/pandas_block.py @@ -1,6 +1,5 @@ import collections import heapq -import sys from typing import ( TYPE_CHECKING, Any, @@ -295,74 +294,7 @@ def num_rows(self) -> int: return self._table.shape[0] def size_bytes(self) -> int: - from pandas.api.types import is_object_dtype - - from ray.data.extensions import TensorArrayElement, TensorDtype - - pd = lazy_import_pandas() - - def get_deep_size(obj): - """Calculates the memory size of objects, - including nested objects using an iterative approach.""" - seen = set() - total_size = 0 - objects = collections.deque([obj]) - while objects: - current = objects.pop() - - # Skip interning-eligible immutable objects - if isinstance(current, (str, bytes, int, float)): - size = sys.getsizeof(current) - total_size += size - continue - - # Check if the object has been seen before - if id(current) in seen: - continue - seen.add(id(current)) - - try: - size = sys.getsizeof(current) - except TypeError: - size = 0 - total_size += size - - # Handle specific cases - if isinstance(current, np.ndarray): - total_size += current.nbytes - size # Avoid double counting - elif isinstance(current, pd.DataFrame): - total_size += ( - current.memory_usage(index=True, deep=True).sum() - size - ) - elif isinstance(current, (list, tuple, set)): - objects.extend(current) - elif isinstance(current, dict): - objects.extend(current.keys()) - objects.extend(current.values()) - elif isinstance(current, TensorArrayElement): - objects.extend(current.to_numpy()) - return total_size - - # Get initial memory usage including deep introspection - memory_usage = self._table.memory_usage(index=True, deep=True) - - # TensorDtype for ray.air.util.tensor_extensions.pandas.TensorDtype - object_need_check = (TensorDtype,) - # Handle object columns separately - for column in self._table.columns: - # Check pandas object dtype and the extenstion dtype - if is_object_dtype(self._table[column].dtype) or isinstance( - self._table[column].dtype, object_need_check - ): - column_memory = 0 - for element in self._table[column]: - column_memory += get_deep_size(element) - memory_usage[column] = column_memory - - # Sum up total memory usage - total_memory_usage = memory_usage.sum() - - return int(total_memory_usage) + return int(self._table.memory_usage(index=True, deep=True).sum()) def _zip(self, acc: BlockAccessor) -> "pandas.DataFrame": r = self.to_pandas().copy(deep=False) diff --git a/python/ray/data/tests/test_pandas_block.py b/python/ray/data/tests/test_pandas_block.py index 5e83abbf04d7..4585d0e2a133 100644 --- a/python/ray/data/tests/test_pandas_block.py +++ b/python/ray/data/tests/test_pandas_block.py @@ -1,10 +1,4 @@ -import pickle -import random -import sys - -import numpy as np import pandas as pd -import pyarrow as pa import pytest import ray @@ -54,226 +48,7 @@ def fn2(batch): assert isinstance(block, pd.DataFrame) -class TestSizeBytes: - def test_small(ray_start_regular_shared): - animals = ["Flamingo", "Centipede"] - block = pd.DataFrame({"animals": animals}) - - block_accessor = PandasBlockAccessor.for_block(block) - bytes_size = block_accessor.size_bytes() - - # check that memory usage is within 10% of the size_bytes - # For strings, Pandas seems to be fairly accurate, so let's use that. - memory_usage = block.memory_usage(index=True, deep=True).sum() - assert bytes_size == pytest.approx(memory_usage, rel=0.1), ( - bytes_size, - memory_usage, - ) - - def test_large_str(ray_start_regular_shared): - animals = [ - random.choice(["alligator", "crocodile", "centipede", "flamingo"]) - for i in range(100_000) - ] - block = pd.DataFrame({"animals": animals}) - block["animals"] = block["animals"].astype("string") - - block_accessor = PandasBlockAccessor.for_block(block) - bytes_size = block_accessor.size_bytes() - - memory_usage = block.memory_usage(index=True, deep=True).sum() - assert bytes_size == pytest.approx(memory_usage, rel=0.1), ( - bytes_size, - memory_usage, - ) - - def test_large_str_object(ray_start_regular_shared): - """Note - this test breaks if you refactor/move the list of animals.""" - num = 100_000 - animals = [ - random.choice(["alligator", "crocodile", "centipede", "flamingo"]) - for i in range(num) - ] - block = pd.DataFrame({"animals": animals}) - - block_accessor = PandasBlockAccessor.for_block(block) - bytes_size = block_accessor.size_bytes() - - memory_usage = sum([sys.getsizeof(animal) for animal in animals]) - - assert bytes_size == pytest.approx(memory_usage, rel=0.1), ( - bytes_size, - memory_usage, - ) - - def test_large_floats(ray_start_regular_shared): - animals = [random.random() for i in range(100_000)] - block = pd.DataFrame({"animals": animals}) - - block_accessor = PandasBlockAccessor.for_block(block) - bytes_size = block_accessor.size_bytes() - - memory_usage = pickle.dumps(block).__sizeof__() - # check that memory usage is within 10% of the size_bytes - assert bytes_size == pytest.approx(memory_usage, rel=0.1), ( - bytes_size, - memory_usage, - ) - - def test_bytes_object(ray_start_regular_shared): - def generate_data(batch): - for _ in range(8): - yield {"data": [[b"\x00" * 128 * 1024 * 128]]} - - ds = ( - ray.data.range(1, override_num_blocks=1) - .map_batches(generate_data, batch_size=1) - .map_batches(lambda batch: batch, batch_format="pandas") - ) - - true_value = 128 * 1024 * 128 * 8 - for bundle in ds.iter_internal_ref_bundles(): - size = bundle.size_bytes() - # assert that true_value is within 10% of bundle.size_bytes() - assert size == pytest.approx(true_value, rel=0.1), ( - size, - true_value, - ) - - def test_nested_numpy(ray_start_regular_shared): - size = 1024 - rows = 1_000 - data = [ - np.random.randint(size=size, low=0, high=100, dtype=np.int8) - for _ in range(rows) - ] - df = pd.DataFrame({"data": data}) - - block_accessor = PandasBlockAccessor.for_block(df) - block_size = block_accessor.size_bytes() - true_value = rows * size - assert block_size == pytest.approx(true_value, rel=0.1), ( - block_size, - true_value, - ) - - def test_nested_objects(ray_start_regular_shared): - size = 10 - rows = 10_000 - lists = [[random.randint(0, 100) for _ in range(size)] for _ in range(rows)] - data = {"lists": lists} - block = pd.DataFrame(data) - - block_accessor = PandasBlockAccessor.for_block(block) - bytes_size = block_accessor.size_bytes() - - # List overhead + 10 integers per list - true_size = rows * ( - sys.getsizeof([random.randint(0, 100) for _ in range(size)]) + size * 28 - ) - - assert bytes_size == pytest.approx(true_size, rel=0.1), ( - bytes_size, - true_size, - ) - - def test_mixed_types(ray_start_regular_shared): - rows = 10_000 - - data = { - "integers": [random.randint(0, 100) for _ in range(rows)], - "floats": [random.random() for _ in range(rows)], - "strings": [ - random.choice(["apple", "banana", "cherry"]) for _ in range(rows) - ], - "object": [b"\x00" * 128 for _ in range(rows)], - } - block = pd.DataFrame(data) - block_accessor = PandasBlockAccessor.for_block(block) - bytes_size = block_accessor.size_bytes() - - # Manually calculate the size - int_size = rows * 8 - float_size = rows * 8 - str_size = sum(sys.getsizeof(string) for string in data["strings"]) - object_size = rows * sys.getsizeof(b"\x00" * 128) - - true_size = int_size + float_size + str_size + object_size - assert bytes_size == pytest.approx(true_size, rel=0.1), (bytes_size, true_size) - - def test_nested_lists_strings(ray_start_regular_shared): - rows = 5_000 - nested_lists = ["a"] * 3 + ["bb"] * 4 + ["ccc"] * 3 - data = { - "nested_lists": [nested_lists for _ in range(rows)], - } - block = pd.DataFrame(data) - block_accessor = PandasBlockAccessor.for_block(block) - bytes_size = block_accessor.size_bytes() - - # Manually calculate the size - list_overhead = sys.getsizeof(block["nested_lists"].iloc[0]) + sum( - [sys.getsizeof(x) for x in nested_lists] - ) - true_size = rows * list_overhead - assert bytes_size == pytest.approx(true_size, rel=0.1), (bytes_size, true_size) - - @pytest.mark.parametrize("size", [10, 1024]) - def test_multi_level_nesting(ray_start_regular_shared, size): - rows = 1_000 - data = { - "complex": [ - {"list": [np.random.rand(size)], "value": {"key": "val"}} - for _ in range(rows) - ], - } - block = pd.DataFrame(data) - block_accessor = PandasBlockAccessor.for_block(block) - bytes_size = block_accessor.size_bytes() - - numpy_size = np.random.rand(size).nbytes - - values = ["list", "value", "key", "val"] - str_size = sum([sys.getsizeof(v) for v in values]) - - list_ref_overhead = sys.getsizeof([np.random.rand(size)]) - - dict_overhead1 = sys.getsizeof({"key": "val"}) - - dict_overhead3 = sys.getsizeof( - {"list": [np.random.rand(size)], "value": {"key": "val"}} - ) - - true_size = ( - numpy_size + str_size + list_ref_overhead + dict_overhead1 + dict_overhead3 - ) * rows - assert bytes_size == pytest.approx(true_size, rel=0.15), ( - bytes_size, - true_size, - ) - - def test_boolean(ray_start_regular_shared): - data = [random.choice([True, False, None]) for _ in range(100_000)] - block = pd.DataFrame({"flags": pd.Series(data, dtype="boolean")}) - block_accessor = PandasBlockAccessor.for_block(block) - bytes_size = block_accessor.size_bytes() - - # No object case - true_size = block.memory_usage(index=True, deep=True).sum() - assert bytes_size == pytest.approx(true_size, rel=0.1), (bytes_size, true_size) - - def test_arrow(ray_start_regular_shared): - data = [ - random.choice(["alligator", "crocodile", "flamingo"]) for _ in range(50_000) - ] - arrow_dtype = pd.ArrowDtype(pa.string()) - block = pd.DataFrame({"animals": pd.Series(data, dtype=arrow_dtype)}) - block_accessor = PandasBlockAccessor.for_block(block) - bytes_size = block_accessor.size_bytes() - - true_size = block.memory_usage(index=True, deep=True).sum() - assert bytes_size == pytest.approx(true_size, rel=0.1), (bytes_size, true_size) - - if __name__ == "__main__": + import sys + sys.exit(pytest.main(["-v", __file__]))