Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memray integration #558

Merged
merged 5 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@
)
from cubed.runtime.pipeline import visit_node_generations, visit_nodes
from cubed.runtime.types import Callback, DagExecutor
from cubed.runtime.utils import handle_callbacks, handle_operation_start_callbacks
from cubed.runtime.utils import (
handle_callbacks,
handle_operation_start_callbacks,
profile_memray,
)
from cubed.spec import Spec

logger = logging.getLogger(__name__)


@profile_memray
def run_func(input, func=None, config=None, name=None, compute_id=None):
result = func(input, config=config)
return result
Expand Down Expand Up @@ -171,6 +176,7 @@ def execute_dag(
) -> None:
use_backups = kwargs.pop("use_backups", True)
wait_dur_sec = kwargs.pop("wait_dur_sec", None)
compute_id = kwargs.pop("compute_id")
allowed_mem = spec.allowed_mem if spec is not None else None
function_executor = FunctionExecutor(**kwargs)
runtime_memory_mb = function_executor.config[function_executor.backend].get(
Expand Down Expand Up @@ -199,6 +205,7 @@ def execute_dag(
func=pipeline.function,
config=pipeline.config,
name=name,
compute_id=compute_id,
):
handle_callbacks(callbacks, stats)
else:
Expand All @@ -224,7 +231,8 @@ def execute_dag(
use_backups=use_backups,
return_stats=True,
wait_dur_sec=wait_dur_sec,
# TODO: kwargs
# TODO: other kwargs (func, config, name)
compute_id=compute_id,
):
handle_callbacks(callbacks, stats)

Expand Down
2 changes: 2 additions & 0 deletions cubed/runtime/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
execution_stats,
handle_callbacks,
handle_operation_start_callbacks,
profile_memray,
)
from cubed.spec import Spec

Expand Down Expand Up @@ -59,6 +60,7 @@ def execute_dag(
[callback.on_task_end(event) for callback in callbacks]


@profile_memray
@execution_stats
def run_func(input, func=None, config=None, name=None, compute_id=None):
return func(input, config=config)
Expand Down
33 changes: 33 additions & 0 deletions cubed/runtime/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import time
from contextlib import nullcontext
from functools import partial
from itertools import islice
from pathlib import Path

from cubed.runtime.types import OperationStartEvent, TaskEndEvent
from cubed.utils import peak_measured_mem

try:
import memray
except ImportError:
memray = None

sym_counter = 0


Expand Down Expand Up @@ -39,6 +46,32 @@ def execution_stats(func):
return partial(execute_with_stats, func)


def execute_with_memray(function, input, **kwargs):
# only run memray for first input (and only for operations that run on block locations)
compute_id = kwargs.get("compute_id", None)
name = kwargs["name"]
if (
compute_id is not None
and memray is not None
and isinstance(input, list)
and all(isinstance(i, int) for i in input)
and sum(input) == 0
):
memray_dir = Path(f"history/{compute_id}/memray")
memray_dir.mkdir(parents=True, exist_ok=True)
cm = memray.Tracker(memray_dir / f"{name}.bin")
else:
cm = nullcontext()
with cm:
result = result = function(input, **kwargs)
return result


def profile_memray(func):
"""Decorator to profile a function call with memray."""
return partial(execute_with_memray, func)


def handle_operation_start_callbacks(callbacks, name):
if callbacks is not None:
event = OperationStartEvent(name)
Expand Down
Binary file added docs/images/memray-add.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
38 changes: 38 additions & 0 deletions docs/user-guide/diagnostics.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,41 @@ The timeline callback will write a graphic `timeline.svg` to a directory with th

### Examples in use
See the [examples](https://github.com/cubed-dev/cubed/blob/main/examples/README.md) for more information about how to use them.

## Memray

[Memray](https://github.com/bloomberg/memray), a memory profiler for Python, can be used to track and view memory allocations when running a single task in a Cubed computation.

This is not usually needed when using Cubed, but for developers writing new operations, improving projected memory sizes, or for debugging a memory issue, it can be very useful to understand how memory is actually allocated in Cubed.

To enable Memray memory profiling in Cubed, simply install memray (`pip install memray`). Then use a local executor that runs tasks in separate processes, such as `processes` (Python 3.11 or later) or `lithops`. When you run a computation, Cubed will enable Memray for the first task in each operation (so if an array has 100 chunks it will only produce one Memray trace).

Here is an example of a simple addition operation, with 200MB chunks. (It is adapted from [test_mem_utilization.py](https://github.com/cubed-dev/cubed/blob/main/cubed/tests/test_mem_utilization.py) in Cubed's test suite.)

```python
import cubed.array_api as xp
import cubed.random

a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
c = xp.add(a, b)
c.compute(optimize_graph=False)
```

The optimizer is turned off so that generation of the random arrays is not fused with the add operation. This way we can see the memory allocations for that operation alone.

After the computation is complete there will be a collection of `.bin` files in the `history/compute-{id}/memray` directory - with one for each operation. To view them we convert them to HTML flame graphs as follows:

```shell
(cd $(ls -d history/compute-* | tail -1)/memray; for f in $(ls *.bin); do echo $f; python -m memray flamegraph --temporal -f -o $f.html $f; done)
```

Here is the flame graph for the add operation:

![Memray temporal view of an 'add' operation](../images/memray-add.png)

Annotations have been added to exaplin what is going on in this example. Note that reading a chunk from Zarr requires twice the chunk memory (400MB) since there is a buffer for the compressed Zarr block (200MB), as well as the resulting array (200MB). After the first chunk has been loaded the memory dips back to 200MB since the compressed buffer is no longer retained.
tomwhite marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-IPython.*]
ignore_missing_imports = True
[mypy-memray.*]
ignore_missing_imports = True
[mypy-modal.*]
ignore_missing_imports = True
[mypy-matplotlib.*]
Expand Down
Loading