Skip to content

Commit

Permalink
[Data] Fix pandas memory calculation. (ray-project#46939)
Browse files Browse the repository at this point in the history
## Why are these changes needed?

close ray-project#46785
Current the memory usage for pandas is not accurate when it's object, so
we just implement to calculated it in recursion in case of nested.
## Related issue number

closes ray-project#46785, closes
ray-project#48506

## Checks

- [√] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [√] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [√] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: zhilong <zhilong.chen@mail.mcgill.ca>
Signed-off-by: Richard Liaw <rliaw@berkeley.edu>
Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
  • Loading branch information
2 people authored and MortalHappiness committed Nov 22, 2024
1 parent d5c23c2 commit aaffdb1
Showing 2 changed files with 296 additions and 3 deletions.
70 changes: 69 additions & 1 deletion python/ray/data/_internal/pandas_block.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import collections
import heapq
import sys
from typing import (
TYPE_CHECKING,
Any,
@@ -294,7 +295,74 @@ def num_rows(self) -> int:
return self._table.shape[0]

def size_bytes(self) -> int:
return int(self._table.memory_usage(index=True, deep=True).sum())
from pandas.api.types import is_object_dtype

from ray.data.extensions import TensorArrayElement, TensorDtype

pd = lazy_import_pandas()

def get_deep_size(obj):
"""Calculates the memory size of objects,
including nested objects using an iterative approach."""
seen = set()
total_size = 0
objects = collections.deque([obj])
while objects:
current = objects.pop()

# Skip interning-eligible immutable objects
if isinstance(current, (str, bytes, int, float)):
size = sys.getsizeof(current)
total_size += size
continue

# Check if the object has been seen before
if id(current) in seen:
continue
seen.add(id(current))

try:
size = sys.getsizeof(current)
except TypeError:
size = 0
total_size += size

# Handle specific cases
if isinstance(current, np.ndarray):
total_size += current.nbytes - size # Avoid double counting
elif isinstance(current, pd.DataFrame):
total_size += (
current.memory_usage(index=True, deep=True).sum() - size
)
elif isinstance(current, (list, tuple, set)):
objects.extend(current)
elif isinstance(current, dict):
objects.extend(current.keys())
objects.extend(current.values())
elif isinstance(current, TensorArrayElement):
objects.extend(current.to_numpy())
return total_size

# Get initial memory usage including deep introspection
memory_usage = self._table.memory_usage(index=True, deep=True)

# TensorDtype for ray.air.util.tensor_extensions.pandas.TensorDtype
object_need_check = (TensorDtype,)
# Handle object columns separately
for column in self._table.columns:
# Check pandas object dtype and the extenstion dtype
if is_object_dtype(self._table[column].dtype) or isinstance(
self._table[column].dtype, object_need_check
):
column_memory = 0
for element in self._table[column]:
column_memory += get_deep_size(element)
memory_usage[column] = column_memory

# Sum up total memory usage
total_memory_usage = memory_usage.sum()

return int(total_memory_usage)

def _zip(self, acc: BlockAccessor) -> "pandas.DataFrame":
r = self.to_pandas().copy(deep=False)
229 changes: 227 additions & 2 deletions python/ray/data/tests/test_pandas_block.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import pickle
import random
import sys

import numpy as np
import pandas as pd
import pyarrow as pa
import pytest

import ray
@@ -48,7 +54,226 @@ def fn2(batch):
assert isinstance(block, pd.DataFrame)


if __name__ == "__main__":
import sys
class TestSizeBytes:
def test_small(ray_start_regular_shared):
animals = ["Flamingo", "Centipede"]
block = pd.DataFrame({"animals": animals})

block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

# check that memory usage is within 10% of the size_bytes
# For strings, Pandas seems to be fairly accurate, so let's use that.
memory_usage = block.memory_usage(index=True, deep=True).sum()
assert bytes_size == pytest.approx(memory_usage, rel=0.1), (
bytes_size,
memory_usage,
)

def test_large_str(ray_start_regular_shared):
animals = [
random.choice(["alligator", "crocodile", "centipede", "flamingo"])
for i in range(100_000)
]
block = pd.DataFrame({"animals": animals})
block["animals"] = block["animals"].astype("string")

block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

memory_usage = block.memory_usage(index=True, deep=True).sum()
assert bytes_size == pytest.approx(memory_usage, rel=0.1), (
bytes_size,
memory_usage,
)

def test_large_str_object(ray_start_regular_shared):
"""Note - this test breaks if you refactor/move the list of animals."""
num = 100_000
animals = [
random.choice(["alligator", "crocodile", "centipede", "flamingo"])
for i in range(num)
]
block = pd.DataFrame({"animals": animals})

block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

memory_usage = sum([sys.getsizeof(animal) for animal in animals])

assert bytes_size == pytest.approx(memory_usage, rel=0.1), (
bytes_size,
memory_usage,
)

def test_large_floats(ray_start_regular_shared):
animals = [random.random() for i in range(100_000)]
block = pd.DataFrame({"animals": animals})

block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

memory_usage = pickle.dumps(block).__sizeof__()
# check that memory usage is within 10% of the size_bytes
assert bytes_size == pytest.approx(memory_usage, rel=0.1), (
bytes_size,
memory_usage,
)

def test_bytes_object(ray_start_regular_shared):
def generate_data(batch):
for _ in range(8):
yield {"data": [[b"\x00" * 128 * 1024 * 128]]}

ds = (
ray.data.range(1, override_num_blocks=1)
.map_batches(generate_data, batch_size=1)
.map_batches(lambda batch: batch, batch_format="pandas")
)

true_value = 128 * 1024 * 128 * 8
for bundle in ds.iter_internal_ref_bundles():
size = bundle.size_bytes()
# assert that true_value is within 10% of bundle.size_bytes()
assert size == pytest.approx(true_value, rel=0.1), (
size,
true_value,
)

def test_nested_numpy(ray_start_regular_shared):
size = 1024
rows = 1_000
data = [
np.random.randint(size=size, low=0, high=100, dtype=np.int8)
for _ in range(rows)
]
df = pd.DataFrame({"data": data})

block_accessor = PandasBlockAccessor.for_block(df)
block_size = block_accessor.size_bytes()
true_value = rows * size
assert block_size == pytest.approx(true_value, rel=0.1), (
block_size,
true_value,
)

def test_nested_objects(ray_start_regular_shared):
size = 10
rows = 10_000
lists = [[random.randint(0, 100) for _ in range(size)] for _ in range(rows)]
data = {"lists": lists}
block = pd.DataFrame(data)

block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

# List overhead + 10 integers per list
true_size = rows * (
sys.getsizeof([random.randint(0, 100) for _ in range(size)]) + size * 28
)

assert bytes_size == pytest.approx(true_size, rel=0.1), (
bytes_size,
true_size,
)

def test_mixed_types(ray_start_regular_shared):
rows = 10_000

data = {
"integers": [random.randint(0, 100) for _ in range(rows)],
"floats": [random.random() for _ in range(rows)],
"strings": [
random.choice(["apple", "banana", "cherry"]) for _ in range(rows)
],
"object": [b"\x00" * 128 for _ in range(rows)],
}
block = pd.DataFrame(data)
block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

# Manually calculate the size
int_size = rows * 8
float_size = rows * 8
str_size = sum(sys.getsizeof(string) for string in data["strings"])
object_size = rows * sys.getsizeof(b"\x00" * 128)

true_size = int_size + float_size + str_size + object_size
assert bytes_size == pytest.approx(true_size, rel=0.1), (bytes_size, true_size)

def test_nested_lists_strings(ray_start_regular_shared):
rows = 5_000
nested_lists = ["a"] * 3 + ["bb"] * 4 + ["ccc"] * 3
data = {
"nested_lists": [nested_lists for _ in range(rows)],
}
block = pd.DataFrame(data)
block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

# Manually calculate the size
list_overhead = sys.getsizeof(block["nested_lists"].iloc[0]) + sum(
[sys.getsizeof(x) for x in nested_lists]
)
true_size = rows * list_overhead
assert bytes_size == pytest.approx(true_size, rel=0.1), (bytes_size, true_size)

@pytest.mark.parametrize("size", [10, 1024])
def test_multi_level_nesting(ray_start_regular_shared, size):
rows = 1_000
data = {
"complex": [
{"list": [np.random.rand(size)], "value": {"key": "val"}}
for _ in range(rows)
],
}
block = pd.DataFrame(data)
block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

numpy_size = np.random.rand(size).nbytes

values = ["list", "value", "key", "val"]
str_size = sum([sys.getsizeof(v) for v in values])

list_ref_overhead = sys.getsizeof([np.random.rand(size)])

dict_overhead1 = sys.getsizeof({"key": "val"})

dict_overhead3 = sys.getsizeof(
{"list": [np.random.rand(size)], "value": {"key": "val"}}
)

true_size = (
numpy_size + str_size + list_ref_overhead + dict_overhead1 + dict_overhead3
) * rows
assert bytes_size == pytest.approx(true_size, rel=0.15), (
bytes_size,
true_size,
)

def test_boolean(ray_start_regular_shared):
data = [random.choice([True, False, None]) for _ in range(100_000)]
block = pd.DataFrame({"flags": pd.Series(data, dtype="boolean")})
block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

# No object case
true_size = block.memory_usage(index=True, deep=True).sum()
assert bytes_size == pytest.approx(true_size, rel=0.1), (bytes_size, true_size)

def test_arrow(ray_start_regular_shared):
data = [
random.choice(["alligator", "crocodile", "flamingo"]) for _ in range(50_000)
]
arrow_dtype = pd.ArrowDtype(pa.string())
block = pd.DataFrame({"animals": pd.Series(data, dtype=arrow_dtype)})
block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

true_size = block.memory_usage(index=True, deep=True).sum()
assert bytes_size == pytest.approx(true_size, rel=0.1), (bytes_size, true_size)


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

0 comments on commit aaffdb1

Please sign in to comment.