Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove max_tasks_per_child=1 limitation from processes executor #515

Merged
merged 3 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/slow-tests.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: Slow tests

on:
pull_request:
schedule:
# Every weekday at 03:49 UTC, see https://crontab.guru/
- cron: "49 3 * * 1-5"
Expand Down
14 changes: 11 additions & 3 deletions cubed/runtime/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,19 @@ async def async_execute_dag(
if spec is not None:
check_runtime_memory(spec, max_workers)
if use_processes:
max_tasks_per_child = kwargs.pop("max_tasks_per_child", None)
context = multiprocessing.get_context("spawn")
# max_tasks_per_child is only supported from Python 3.11
concurrent_executor = ProcessPoolExecutor(
max_workers=max_workers, mp_context=context, max_tasks_per_child=1
)
if max_tasks_per_child is None:
concurrent_executor = ProcessPoolExecutor(
max_workers=max_workers, mp_context=context
)
else:
concurrent_executor = ProcessPoolExecutor(
max_workers=max_workers,
mp_context=context,
max_tasks_per_child=max_tasks_per_child,
)
else:
concurrent_executor = ThreadPoolExecutor(max_workers=max_workers)
try:
Expand Down
138 changes: 90 additions & 48 deletions cubed/tests/test_mem_utilization.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,60 @@
import math
import platform
import shutil
import sys
from functools import partial, reduce

import pytest

from cubed.core.ops import partial_reduce
from cubed.core.optimization import multiple_inputs_optimize_dag

pytest.importorskip("lithops")

import cubed
import cubed.array_api as xp
import cubed.random
from cubed.backend_array_api import namespace as nxp
from cubed.core.ops import partial_reduce
from cubed.core.optimization import multiple_inputs_optimize_dag
from cubed.extensions.history import HistoryCallback
from cubed.extensions.mem_warn import MemoryWarningCallback
from cubed.runtime.executors.lithops import LithopsExecutor
from cubed.runtime.create import create_executor
from cubed.tests.utils import LITHOPS_LOCAL_CONFIG

ALLOWED_MEM = 2_000_000_000

EXECUTORS = {}

if platform.system() != "Windows":
EXECUTORS["processes"] = create_executor("processes")

# Run with max_tasks_per_child=1 so that each task is run in a new process,
# allowing us to perform a stronger check on peak memory
if sys.version_info >= (3, 11):
executor_options = dict(max_tasks_per_child=1)
EXECUTORS["processes-single-task"] = create_executor(
"processes", executor_options
)

try:
executor_options = dict(config=LITHOPS_LOCAL_CONFIG, wait_dur_sec=0.1)
EXECUTORS["lithops"] = create_executor("lithops", executor_options)
except ImportError:
pass


@pytest.fixture()
def spec(tmp_path, reserved_mem):
return cubed.Spec(tmp_path, allowed_mem=2_000_000_000, reserved_mem=reserved_mem)
return cubed.Spec(tmp_path, allowed_mem=ALLOWED_MEM, reserved_mem=reserved_mem)


@pytest.fixture(
scope="module",
params=EXECUTORS.values(),
ids=EXECUTORS.keys(),
)
def executor(request):
return request.param


@pytest.fixture(scope="module")
def reserved_mem():
executor = LithopsExecutor(config=LITHOPS_LOCAL_CONFIG)
def reserved_mem(executor):
res = cubed.measure_reserved_mem(executor) * 1.1 # add some wiggle room
return round_up_to_multiple(res, 10_000_000) # round up to nearest multiple of 10MB

Expand All @@ -40,58 +68,58 @@ def round_up_to_multiple(x, multiple=10):


@pytest.mark.slow
def test_index(tmp_path, spec):
def test_index(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = a[1:, :]
run_operation(tmp_path, "index", b)
run_operation(tmp_path, executor, "index", b)


@pytest.mark.slow
def test_index_step(tmp_path, spec):
def test_index_step(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = a[::2, :]
run_operation(tmp_path, "index_step", b)
run_operation(tmp_path, executor, "index_step", b)


# Creation Functions


@pytest.mark.slow
def test_eye(tmp_path, spec):
def test_eye(tmp_path, spec, executor):
a = xp.eye(10000, 10000, chunks=(5000, 5000), spec=spec)
run_operation(tmp_path, "eye", a)
run_operation(tmp_path, executor, "eye", a)


@pytest.mark.slow
def test_tril(tmp_path, spec):
def test_tril(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = xp.tril(a)
run_operation(tmp_path, "tril", b)
run_operation(tmp_path, executor, "tril", b)


# Elementwise Functions


@pytest.mark.slow
def test_add(tmp_path, spec):
def test_add(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
c = xp.add(a, b)
run_operation(tmp_path, "add", c)
run_operation(tmp_path, executor, "add", c)


@pytest.mark.slow
def test_add_reduce_left(tmp_path, spec):
def test_add_reduce_left(tmp_path, spec, executor):
# Perform the `add` operation repeatedly on pairs of arrays, also known as fold left.
# See https://en.wikipedia.org/wiki/Fold_(higher-order_function)
#
Expand All @@ -111,11 +139,13 @@ def test_add_reduce_left(tmp_path, spec):
]
result = reduce(lambda x, y: xp.add(x, y), arrs)
opt_fn = partial(multiple_inputs_optimize_dag, max_total_source_arrays=n_arrays * 2)
run_operation(tmp_path, "add_reduce_left", result, optimize_function=opt_fn)
run_operation(
tmp_path, executor, "add_reduce_left", result, optimize_function=opt_fn
)


@pytest.mark.slow
def test_add_reduce_right(tmp_path, spec):
def test_add_reduce_right(tmp_path, spec, executor):
# Perform the `add` operation repeatedly on pairs of arrays, also known as fold right.
# See https://en.wikipedia.org/wiki/Fold_(higher-order_function)
#
Expand All @@ -137,23 +167,25 @@ def test_add_reduce_right(tmp_path, spec):
]
result = reduce(lambda x, y: xp.add(y, x), reversed(arrs))
opt_fn = partial(multiple_inputs_optimize_dag, max_total_source_arrays=n_arrays * 2)
run_operation(tmp_path, "add_reduce_right", result, optimize_function=opt_fn)
run_operation(
tmp_path, executor, "add_reduce_right", result, optimize_function=opt_fn
)


@pytest.mark.slow
def test_negative(tmp_path, spec):
def test_negative(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = xp.negative(a)
run_operation(tmp_path, "negative", b)
run_operation(tmp_path, executor, "negative", b)


# Linear Algebra Functions


@pytest.mark.slow
def test_matmul(tmp_path, spec):
def test_matmul(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
Expand All @@ -163,20 +195,20 @@ def test_matmul(tmp_path, spec):
c = xp.astype(a, xp.float32)
d = xp.astype(b, xp.float32)
e = xp.matmul(c, d)
run_operation(tmp_path, "matmul", e)
run_operation(tmp_path, executor, "matmul", e)


@pytest.mark.slow
def test_matrix_transpose(tmp_path, spec):
def test_matrix_transpose(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = xp.matrix_transpose(a)
run_operation(tmp_path, "matrix_transpose", b)
run_operation(tmp_path, executor, "matrix_transpose", b)


@pytest.mark.slow
def test_tensordot(tmp_path, spec):
def test_tensordot(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
Expand All @@ -186,14 +218,14 @@ def test_tensordot(tmp_path, spec):
c = xp.astype(a, xp.float32)
d = xp.astype(b, xp.float32)
e = xp.tensordot(c, d, axes=1)
run_operation(tmp_path, "tensordot", e)
run_operation(tmp_path, executor, "tensordot", e)


# Manipulation Functions


@pytest.mark.slow
def test_concat(tmp_path, spec):
def test_concat(tmp_path, spec, executor):
# Note 'a' has one fewer element in axis=0 to force chunking to cross array boundaries
a = cubed.random.random(
(9999, 10000), chunks=(5000, 5000), spec=spec
Expand All @@ -202,81 +234,80 @@ def test_concat(tmp_path, spec):
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
c = xp.concat((a, b), axis=0)
run_operation(tmp_path, "concat", c)
run_operation(tmp_path, executor, "concat", c)


@pytest.mark.slow
def test_reshape(tmp_path, spec):
def test_reshape(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
# need intermediate reshape due to limitations in Dask's reshape_rechunk
b = xp.reshape(a, (5000, 2, 10000))
c = xp.reshape(b, (5000, 20000))
run_operation(tmp_path, "reshape", c)
run_operation(tmp_path, executor, "reshape", c)


@pytest.mark.slow
def test_stack(tmp_path, spec):
def test_stack(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
c = xp.stack((a, b), axis=0)
run_operation(tmp_path, "stack", c)
run_operation(tmp_path, executor, "stack", c)


# Searching Functions


@pytest.mark.slow
def test_argmax(tmp_path, spec):
def test_argmax(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = xp.argmax(a, axis=0)
run_operation(tmp_path, "argmax", b)
run_operation(tmp_path, executor, "argmax", b)


# Statistical Functions


@pytest.mark.slow
def test_max(tmp_path, spec):
def test_max(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = xp.max(a, axis=0)
run_operation(tmp_path, "max", b)
run_operation(tmp_path, executor, "max", b)


@pytest.mark.slow
def test_mean(tmp_path, spec):
def test_mean(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = xp.mean(a, axis=0)
run_operation(tmp_path, "mean", b)
run_operation(tmp_path, executor, "mean", b)


@pytest.mark.slow
def test_sum_partial_reduce(tmp_path, spec):
def test_sum_partial_reduce(tmp_path, spec, executor):
a = cubed.random.random(
(40000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = partial_reduce(a, nxp.sum, split_every={0: 8})
run_operation(tmp_path, "sum_partial_reduce", b)
run_operation(tmp_path, executor, "sum_partial_reduce", b)


# Internal functions


def run_operation(tmp_path, name, result_array, *, optimize_function=None):
def run_operation(tmp_path, executor, name, result_array, *, optimize_function=None):
# result_array.visualize(f"cubed-{name}-unoptimized", optimize_graph=False)
# result_array.visualize(f"cubed-{name}", optimize_function=optimize_function)
executor = LithopsExecutor(config=LITHOPS_LOCAL_CONFIG)
hist = HistoryCallback()
mem_warn = MemoryWarningCallback()
# use store=None to write to temporary zarr
Expand All @@ -291,8 +322,19 @@ def run_operation(tmp_path, name, result_array, *, optimize_function=None):
df = hist.stats_df
print(df)

# check peak memory does not exceed allowed mem
assert (df["peak_measured_mem_end_mb_max"] <= ALLOWED_MEM // 1_000_000).all()

# check change in peak memory is no more than projected mem
assert (df["peak_measured_mem_delta_mb_max"] <= df["projected_mem_mb"]).all()

# check projected_mem_utilization does not exceed 1
assert (df["projected_mem_utilization"] <= 1.0).all()
# except on processes executor that runs multiple tasks in a process
if (
executor.name != "processes"
or executor.kwargs.get("max_tasks_per_child", None) == 1
):
assert (df["projected_mem_utilization"] <= 1.0).all()

# delete temp files for this test immediately since they are so large
shutil.rmtree(tmp_path)
7 changes: 2 additions & 5 deletions cubed/tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import platform
import sys
from typing import Iterable

import networkx as nx
Expand Down Expand Up @@ -29,10 +28,8 @@
# ThreadsExecutor calls `peak_measured_mem` which is not supported on Windows
ALL_EXECUTORS.append(create_executor("threads"))

# ProcessesExecutor uses an API available from 3.11 onwards (max_tasks_per_child)
if sys.version_info >= (3, 11):
ALL_EXECUTORS.append(create_executor("processes"))
MAIN_EXECUTORS.append(create_executor("processes"))
ALL_EXECUTORS.append(create_executor("processes"))
MAIN_EXECUTORS.append(create_executor("processes"))

try:
ALL_EXECUTORS.append(create_executor("beam"))
Expand Down
Loading