Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Fix pandas memory calculation. #46939

Merged
merged 21 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion python/ray/data/_internal/pandas_block.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import collections
import heapq
import sys
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -294,7 +295,65 @@ def num_rows(self) -> int:
return self._table.shape[0]

def size_bytes(self) -> int:
return int(self._table.memory_usage(index=True, deep=True).sum())
pd = lazy_import_pandas()

def get_deep_size(obj):
"""Calculates the memory size of objects,
including nested objects using an iterative approach."""
seen = set()
total_size = 0
objects = collections.deque([obj])
while objects:
current = objects.pop()

# Skip interning-eligible immutable objects
if isinstance(current, (str, bytes, int, float)):
size = sys.getsizeof(current)
total_size += size
continue

# Check if the object has been seen before
if id(current) in seen:
continue
seen.add(id(current))

try:
size = sys.getsizeof(current)
except TypeError:
size = 0
total_size += size

# Handle specific cases
if isinstance(current, np.ndarray):
total_size += current.nbytes - size # Avoid double counting
elif isinstance(current, pd.DataFrame):
total_size += (
current.memory_usage(index=True, deep=True).sum() - size
)
elif isinstance(current, (list, tuple, set)):
objects.extend(current)
elif isinstance(current, dict):
objects.extend(current.keys())
objects.extend(current.values())
return total_size

# Get initial memory usage including deep introspection
memory_usage = self._table.memory_usage(index=True, deep=True)

# python_object() for arrow of bytes
object_need_check = ["object", "python_object()"]
# Handle object columns separately
for column in self._table.columns:
if str(self._table[column].dtype) in object_need_check:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, can we directly compare the dtype without casting it to strings?

Copy link
Contributor Author

@Bye-legumes Bye-legumes Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just fixed now!

column_memory = 0
for element in self._table[column]:
column_memory += get_deep_size(element)
memory_usage[column] = column_memory

# Sum up total memory usage
total_memory_usage = memory_usage.sum()

return int(total_memory_usage)

def _zip(self, acc: BlockAccessor) -> "pandas.DataFrame":
r = self.to_pandas().copy(deep=False)
Expand Down
236 changes: 234 additions & 2 deletions python/ray/data/tests/test_pandas_block.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import pickle
import random
import sys

import numpy as np
import pandas as pd
import pyarrow as pa
import pytest

import ray
Expand Down Expand Up @@ -48,7 +54,233 @@ def fn2(batch):
assert isinstance(block, pd.DataFrame)


if __name__ == "__main__":
import sys
class TestSizeBytes:
def test_small(ray_start_regular_shared):
animals = ["Flamingo", "Centipede"]
block = pd.DataFrame({"animals": animals})

block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

# check that memory usage is within 10% of the size_bytes
# For strings, Pandas seems to be fairly accurate, so let's use that.
memory_usage = block.memory_usage(index=True, deep=True).sum()
assert memory_usage * 0.9 <= bytes_size <= memory_usage * 1.1, (
Bye-legumes marked this conversation as resolved.
Show resolved Hide resolved
bytes_size,
memory_usage,
)

def test_large_str(ray_start_regular_shared):
animals = [
random.choice(["alligator", "crocodile", "centipede", "flamingo"])
for i in range(100_000)
]
block = pd.DataFrame({"animals": animals})
block["animals"] = block["animals"].astype("string")

block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

memory_usage = block.memory_usage(index=True, deep=True).sum()
assert memory_usage * 0.9 <= bytes_size <= memory_usage * 1.1, (
bytes_size,
memory_usage,
)

def test_large_str_object(ray_start_regular_shared):
"""Note - this test breaks if you refactor/move the list of animals."""
num = 100_000
animals = [
random.choice(["alligator", "crocodile", "centipede", "flamingo"])
for i in range(num)
]
block = pd.DataFrame({"animals": animals})

block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

mean_size = (
sum(
[
sys.getsizeof(animal)
for animal in ["alligator", "crocodile", "centipede", "flamingo"]
]
)
/ 4
)
Bye-legumes marked this conversation as resolved.
Show resolved Hide resolved
memory_usage = mean_size * num
assert memory_usage * 0.9 <= bytes_size <= memory_usage * 1.1, (
bytes_size,
memory_usage,
)

def test_large_floats(ray_start_regular_shared):
animals = [random.random() for i in range(100_000)]
block = pd.DataFrame({"animals": animals})

block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

memory_usage = pickle.dumps(block).__sizeof__()
# check that memory usage is within 10% of the size_bytes
assert memory_usage * 0.9 <= bytes_size <= memory_usage * 1.1, (
bytes_size,
memory_usage,
)

def test_bytes_object(ray_start_regular_shared):
def generate_data(batch):
for _ in range(8):
yield {"data": [[b"\x00" * 128 * 1024 * 128]]}

ds = (
ray.data.range(1, override_num_blocks=1)
.map_batches(generate_data, batch_size=1)
.map_batches(lambda batch: batch, batch_format="pandas")
)

true_value = 128 * 1024 * 128 * 8
for bundle in ds.iter_internal_ref_bundles():
size = bundle.size_bytes()
# assert that true_value is within 10% of bundle.size_bytes()
assert true_value * 0.9 <= size <= true_value * 1.1, (true_value, size)

def test_unowned_numpy(ray_start_regular_shared):
Bye-legumes marked this conversation as resolved.
Show resolved Hide resolved
size = 1024
rows = 1_000
data = [
np.random.randint(size=size, low=0, high=100, dtype=np.int8)
for _ in range(rows)
]
df = pd.DataFrame({"data": data})

block_accessor = PandasBlockAccessor.for_block(df)
block_size = block_accessor.size_bytes()
true_value = rows * size
assert true_value * 0.9 <= block_size <= true_value * 1.1

def test_nested_objects(ray_start_regular_shared):
size = 10
rows = 10_000
lists = [[random.randint(0, 100) for _ in range(size)] for _ in range(rows)]
data = {"lists": lists}
block = pd.DataFrame(data)

block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

# List overhead + 10 integers per list
true_size = rows * (
sys.getsizeof([random.randint(0, 100) for _ in range(size)]) + size * 28
)

assert true_size * 0.9 <= bytes_size <= true_size * 1.1, (
bytes_size,
true_size,
)

def test_mixed_types(ray_start_regular_shared):
rows = 10_000

data = {
"integers": [random.randint(0, 100) for _ in range(rows)],
"floats": [random.random() for _ in range(rows)],
"strings": [
random.choice(["apple", "banana", "cherry"]) for _ in range(rows)
],
"object": [b"\x00" * 128 for _ in range(rows)],
}
block = pd.DataFrame(data)
block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

# Manually calculate the size
int_size = rows * 8
float_size = rows * 8
str_size = (
rows * sum([sys.getsizeof(s) for s in ["apple", "banana", "cherry"]]) // 3
Bye-legumes marked this conversation as resolved.
Show resolved Hide resolved
)
object_size = rows * sys.getsizeof(b"\x00" * 128)

true_size = int_size + float_size + str_size + object_size
assert true_size * 0.9 <= bytes_size <= true_size * 1.1, (bytes_size, true_size)

def test_nested_lists_strings(ray_start_regular_shared):
rows = 5_000
size = 10
data = {
"nested_lists": [
[random.choice(["a", "bb", "ccc"]) for _ in range(size)]
for _ in range(rows)
],
}
block = pd.DataFrame(data)
block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

# Manually calculate the size
list_overhead = sys.getsizeof(
block["nested_lists"].iloc[0]
) + size * sys.getsizeof("bb")
true_size = rows * list_overhead
assert true_size * 0.9 <= bytes_size <= true_size * 1.1, (bytes_size, true_size)

@pytest.mark.parametrize("size", [10, 1024])
def test_multi_level_nesting(ray_start_regular_shared, size):
rows = 1_000
data = {
"complex": [
{"list": [np.random.rand(size)], "value": {"key": "val"}}
for _ in range(rows)
],
}
block = pd.DataFrame(data)
block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

numpy_size = np.random.rand(size).nbytes

values = ["list", "value", "key", "val"]
str_size = sum([sys.getsizeof(v) for v in values])

list_ref_overhead = sys.getsizeof([np.random.rand(size)])

dict_overhead1 = sys.getsizeof({"key": "val"})

dict_overhead3 = sys.getsizeof(
{"list": [np.random.rand(size)], "value": {"key": "val"}}
)

true_size = (
numpy_size + str_size + list_ref_overhead + dict_overhead1 + dict_overhead3
) * rows
assert true_size * 0.85 <= bytes_size <= true_size * 1.15, (
bytes_size,
true_size,
)

def test_boolean(ray_start_regular_shared):
data = [random.choice([True, False, None]) for _ in range(100_000)]
block = pd.DataFrame({"flags": pd.Series(data, dtype="boolean")})
block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

# No object case
true_size = block.memory_usage(index=True, deep=True).sum()
assert true_size * 0.9 <= bytes_size <= true_size * 1.1, (bytes_size, true_size)

def test_arrow(ray_start_regular_shared):
data = [
random.choice(["alligator", "crocodile", "flamingo"]) for _ in range(50_000)
]
arrow_dtype = pd.ArrowDtype(pa.string())
block = pd.DataFrame({"animals": pd.Series(data, dtype=arrow_dtype)})
block_accessor = PandasBlockAccessor.for_block(block)
bytes_size = block_accessor.size_bytes()

true_size = block.memory_usage(index=True, deep=True).sum()
assert true_size * 0.9 <= bytes_size <= true_size * 1.1, (bytes_size, true_size)


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))