From 8a0f8109161382a94ee5d733bc392a521d9f4cba Mon Sep 17 00:00:00 2001 From: zhilong <121425509+Bye-legumes@users.noreply.github.com> Date: Thu, 21 Nov 2024 16:41:55 -0500 Subject: [PATCH] [Data] Fix pandas memory calculation. (#46939) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Why are these changes needed? close https://github.com/ray-project/ray/issues/46785 Current the memory usage for pandas is not accurate when it's object, so we just implement to calculated it in recursion in case of nested. ## Related issue number closes https://github.com/ray-project/ray/issues/46785, closes https://github.com/ray-project/ray/issues/48506 ## Checks - [√] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [√] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [√] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: zhilong Signed-off-by: Richard Liaw Co-authored-by: Richard Liaw --- python/ray/data/_internal/pandas_block.py | 70 ++++++- python/ray/data/tests/test_pandas_block.py | 229 ++++++++++++++++++++- 2 files changed, 296 insertions(+), 3 deletions(-) diff --git a/python/ray/data/_internal/pandas_block.py b/python/ray/data/_internal/pandas_block.py index 119469b46c1b..ff1686c1b355 100644 --- a/python/ray/data/_internal/pandas_block.py +++ b/python/ray/data/_internal/pandas_block.py @@ -1,5 +1,6 @@ import collections import heapq +import sys from typing import ( TYPE_CHECKING, Any, @@ -294,7 +295,74 @@ def num_rows(self) -> int: return self._table.shape[0] def size_bytes(self) -> int: - return int(self._table.memory_usage(index=True, deep=True).sum()) + from pandas.api.types import is_object_dtype + + from ray.data.extensions import TensorArrayElement, TensorDtype + + pd = lazy_import_pandas() + + def get_deep_size(obj): + """Calculates the memory size of objects, + including nested objects using an iterative approach.""" + seen = set() + total_size = 0 + objects = collections.deque([obj]) + while objects: + current = objects.pop() + + # Skip interning-eligible immutable objects + if isinstance(current, (str, bytes, int, float)): + size = sys.getsizeof(current) + total_size += size + continue + + # Check if the object has been seen before + if id(current) in seen: + continue + seen.add(id(current)) + + try: + size = sys.getsizeof(current) + except TypeError: + size = 0 + total_size += size + + # Handle specific cases + if isinstance(current, np.ndarray): + total_size += current.nbytes - size # Avoid double counting + elif isinstance(current, pd.DataFrame): + total_size += ( + current.memory_usage(index=True, deep=True).sum() - size + ) + elif isinstance(current, (list, tuple, set)): + objects.extend(current) + elif isinstance(current, dict): + objects.extend(current.keys()) + objects.extend(current.values()) + elif isinstance(current, TensorArrayElement): + objects.extend(current.to_numpy()) + return total_size + + # Get initial memory usage including deep introspection + memory_usage = self._table.memory_usage(index=True, deep=True) + + # TensorDtype for ray.air.util.tensor_extensions.pandas.TensorDtype + object_need_check = (TensorDtype,) + # Handle object columns separately + for column in self._table.columns: + # Check pandas object dtype and the extenstion dtype + if is_object_dtype(self._table[column].dtype) or isinstance( + self._table[column].dtype, object_need_check + ): + column_memory = 0 + for element in self._table[column]: + column_memory += get_deep_size(element) + memory_usage[column] = column_memory + + # Sum up total memory usage + total_memory_usage = memory_usage.sum() + + return int(total_memory_usage) def _zip(self, acc: BlockAccessor) -> "pandas.DataFrame": r = self.to_pandas().copy(deep=False) diff --git a/python/ray/data/tests/test_pandas_block.py b/python/ray/data/tests/test_pandas_block.py index 4585d0e2a133..5e83abbf04d7 100644 --- a/python/ray/data/tests/test_pandas_block.py +++ b/python/ray/data/tests/test_pandas_block.py @@ -1,4 +1,10 @@ +import pickle +import random +import sys + +import numpy as np import pandas as pd +import pyarrow as pa import pytest import ray @@ -48,7 +54,226 @@ def fn2(batch): assert isinstance(block, pd.DataFrame) -if __name__ == "__main__": - import sys +class TestSizeBytes: + def test_small(ray_start_regular_shared): + animals = ["Flamingo", "Centipede"] + block = pd.DataFrame({"animals": animals}) + + block_accessor = PandasBlockAccessor.for_block(block) + bytes_size = block_accessor.size_bytes() + + # check that memory usage is within 10% of the size_bytes + # For strings, Pandas seems to be fairly accurate, so let's use that. + memory_usage = block.memory_usage(index=True, deep=True).sum() + assert bytes_size == pytest.approx(memory_usage, rel=0.1), ( + bytes_size, + memory_usage, + ) + + def test_large_str(ray_start_regular_shared): + animals = [ + random.choice(["alligator", "crocodile", "centipede", "flamingo"]) + for i in range(100_000) + ] + block = pd.DataFrame({"animals": animals}) + block["animals"] = block["animals"].astype("string") + + block_accessor = PandasBlockAccessor.for_block(block) + bytes_size = block_accessor.size_bytes() + + memory_usage = block.memory_usage(index=True, deep=True).sum() + assert bytes_size == pytest.approx(memory_usage, rel=0.1), ( + bytes_size, + memory_usage, + ) + + def test_large_str_object(ray_start_regular_shared): + """Note - this test breaks if you refactor/move the list of animals.""" + num = 100_000 + animals = [ + random.choice(["alligator", "crocodile", "centipede", "flamingo"]) + for i in range(num) + ] + block = pd.DataFrame({"animals": animals}) + + block_accessor = PandasBlockAccessor.for_block(block) + bytes_size = block_accessor.size_bytes() + + memory_usage = sum([sys.getsizeof(animal) for animal in animals]) + + assert bytes_size == pytest.approx(memory_usage, rel=0.1), ( + bytes_size, + memory_usage, + ) + + def test_large_floats(ray_start_regular_shared): + animals = [random.random() for i in range(100_000)] + block = pd.DataFrame({"animals": animals}) + + block_accessor = PandasBlockAccessor.for_block(block) + bytes_size = block_accessor.size_bytes() + + memory_usage = pickle.dumps(block).__sizeof__() + # check that memory usage is within 10% of the size_bytes + assert bytes_size == pytest.approx(memory_usage, rel=0.1), ( + bytes_size, + memory_usage, + ) + + def test_bytes_object(ray_start_regular_shared): + def generate_data(batch): + for _ in range(8): + yield {"data": [[b"\x00" * 128 * 1024 * 128]]} + + ds = ( + ray.data.range(1, override_num_blocks=1) + .map_batches(generate_data, batch_size=1) + .map_batches(lambda batch: batch, batch_format="pandas") + ) + + true_value = 128 * 1024 * 128 * 8 + for bundle in ds.iter_internal_ref_bundles(): + size = bundle.size_bytes() + # assert that true_value is within 10% of bundle.size_bytes() + assert size == pytest.approx(true_value, rel=0.1), ( + size, + true_value, + ) + + def test_nested_numpy(ray_start_regular_shared): + size = 1024 + rows = 1_000 + data = [ + np.random.randint(size=size, low=0, high=100, dtype=np.int8) + for _ in range(rows) + ] + df = pd.DataFrame({"data": data}) + + block_accessor = PandasBlockAccessor.for_block(df) + block_size = block_accessor.size_bytes() + true_value = rows * size + assert block_size == pytest.approx(true_value, rel=0.1), ( + block_size, + true_value, + ) + + def test_nested_objects(ray_start_regular_shared): + size = 10 + rows = 10_000 + lists = [[random.randint(0, 100) for _ in range(size)] for _ in range(rows)] + data = {"lists": lists} + block = pd.DataFrame(data) + + block_accessor = PandasBlockAccessor.for_block(block) + bytes_size = block_accessor.size_bytes() + # List overhead + 10 integers per list + true_size = rows * ( + sys.getsizeof([random.randint(0, 100) for _ in range(size)]) + size * 28 + ) + + assert bytes_size == pytest.approx(true_size, rel=0.1), ( + bytes_size, + true_size, + ) + + def test_mixed_types(ray_start_regular_shared): + rows = 10_000 + + data = { + "integers": [random.randint(0, 100) for _ in range(rows)], + "floats": [random.random() for _ in range(rows)], + "strings": [ + random.choice(["apple", "banana", "cherry"]) for _ in range(rows) + ], + "object": [b"\x00" * 128 for _ in range(rows)], + } + block = pd.DataFrame(data) + block_accessor = PandasBlockAccessor.for_block(block) + bytes_size = block_accessor.size_bytes() + + # Manually calculate the size + int_size = rows * 8 + float_size = rows * 8 + str_size = sum(sys.getsizeof(string) for string in data["strings"]) + object_size = rows * sys.getsizeof(b"\x00" * 128) + + true_size = int_size + float_size + str_size + object_size + assert bytes_size == pytest.approx(true_size, rel=0.1), (bytes_size, true_size) + + def test_nested_lists_strings(ray_start_regular_shared): + rows = 5_000 + nested_lists = ["a"] * 3 + ["bb"] * 4 + ["ccc"] * 3 + data = { + "nested_lists": [nested_lists for _ in range(rows)], + } + block = pd.DataFrame(data) + block_accessor = PandasBlockAccessor.for_block(block) + bytes_size = block_accessor.size_bytes() + + # Manually calculate the size + list_overhead = sys.getsizeof(block["nested_lists"].iloc[0]) + sum( + [sys.getsizeof(x) for x in nested_lists] + ) + true_size = rows * list_overhead + assert bytes_size == pytest.approx(true_size, rel=0.1), (bytes_size, true_size) + + @pytest.mark.parametrize("size", [10, 1024]) + def test_multi_level_nesting(ray_start_regular_shared, size): + rows = 1_000 + data = { + "complex": [ + {"list": [np.random.rand(size)], "value": {"key": "val"}} + for _ in range(rows) + ], + } + block = pd.DataFrame(data) + block_accessor = PandasBlockAccessor.for_block(block) + bytes_size = block_accessor.size_bytes() + + numpy_size = np.random.rand(size).nbytes + + values = ["list", "value", "key", "val"] + str_size = sum([sys.getsizeof(v) for v in values]) + + list_ref_overhead = sys.getsizeof([np.random.rand(size)]) + + dict_overhead1 = sys.getsizeof({"key": "val"}) + + dict_overhead3 = sys.getsizeof( + {"list": [np.random.rand(size)], "value": {"key": "val"}} + ) + + true_size = ( + numpy_size + str_size + list_ref_overhead + dict_overhead1 + dict_overhead3 + ) * rows + assert bytes_size == pytest.approx(true_size, rel=0.15), ( + bytes_size, + true_size, + ) + + def test_boolean(ray_start_regular_shared): + data = [random.choice([True, False, None]) for _ in range(100_000)] + block = pd.DataFrame({"flags": pd.Series(data, dtype="boolean")}) + block_accessor = PandasBlockAccessor.for_block(block) + bytes_size = block_accessor.size_bytes() + + # No object case + true_size = block.memory_usage(index=True, deep=True).sum() + assert bytes_size == pytest.approx(true_size, rel=0.1), (bytes_size, true_size) + + def test_arrow(ray_start_regular_shared): + data = [ + random.choice(["alligator", "crocodile", "flamingo"]) for _ in range(50_000) + ] + arrow_dtype = pd.ArrowDtype(pa.string()) + block = pd.DataFrame({"animals": pd.Series(data, dtype=arrow_dtype)}) + block_accessor = PandasBlockAccessor.for_block(block) + bytes_size = block_accessor.size_bytes() + + true_size = block.memory_usage(index=True, deep=True).sum() + assert bytes_size == pytest.approx(true_size, rel=0.1), (bytes_size, true_size) + + +if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__]))