Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable backup tasks by default for all executors that support them #378

Merged
merged 1 commit into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cubed/runtime/executors/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
async def async_map_unordered(
create_futures_func: Callable[..., List[Tuple[Any, Future]]],
input: Iterable[Any],
use_backups: bool = False,
use_backups: bool = True,
create_backup_futures_func: Optional[
Callable[..., List[Tuple[Any, Future]]]
] = None,
Expand Down
2 changes: 1 addition & 1 deletion cubed/runtime/executors/dask_distributed_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async def map_unordered(
map_function: Callable[..., Any],
map_iterdata: Iterable[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]],
retries: int = 2,
use_backups: bool = False,
use_backups: bool = True,
batch_size: Optional[int] = None,
return_stats: bool = False,
name: Optional[str] = None,
Expand Down
6 changes: 3 additions & 3 deletions cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def map_unordered(
include_modules: List[str] = [],
timeout: Optional[int] = None,
retries: int = 2,
use_backups: bool = False,
use_backups: bool = True,
return_stats: bool = False,
**kwargs,
) -> Iterator[Any]:
Expand Down Expand Up @@ -138,7 +138,7 @@ def map_unordered(
future, now, start_times[group_name], end_times[group_name]
):
input = future.input
logger.info("Running backup task for %s", input)
logger.warn("Running backup task for %s", input)
futures = lithops_function_executor.map(
group_name_to_function[group_name],
[input],
Expand Down Expand Up @@ -166,7 +166,7 @@ def execute_dag(
compute_arrays_in_parallel: Optional[bool] = None,
**kwargs,
) -> None:
use_backups = kwargs.pop("use_backups", False)
use_backups = kwargs.pop("use_backups", True)
allowed_mem = spec.allowed_mem if spec is not None else None
function_executor = FunctionExecutor(**kwargs)
runtime_memory_mb = function_executor.config[function_executor.backend].get(
Expand Down
2 changes: 1 addition & 1 deletion cubed/runtime/executors/modal_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
async def map_unordered(
app_function: Function,
input: Iterable[Any],
use_backups: bool = False,
use_backups: bool = True,
backup_function: Optional[Function] = None,
batch_size: Optional[int] = None,
return_stats: bool = False,
Expand Down
2 changes: 1 addition & 1 deletion cubed/runtime/executors/python_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async def map_unordered(
function: Callable[..., Any],
input: Iterable[Any],
retries: int = 2,
use_backups: bool = False,
use_backups: bool = True,
batch_size: Optional[int] = None,
return_stats: bool = False,
name: Optional[str] = None,
Expand Down
8 changes: 6 additions & 2 deletions cubed/tests/runtime/test_dask_distributed_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ async def run_test(function, input, retries, use_backups=False, batch_size=None)
],
)
# fmt: on
def test_success(tmp_path, timing_map, n_tasks, retries):
@pytest.mark.parametrize("use_backups", [False, True])
def test_success(tmp_path, timing_map, n_tasks, retries, use_backups):
outputs = asyncio.run(
run_test(
function=partial(deterministic_failure, tmp_path, timing_map),
input=range(n_tasks),
retries=retries,
use_backups=use_backups,
)
)

Expand All @@ -66,13 +68,15 @@ def test_success(tmp_path, timing_map, n_tasks, retries):
],
)
# fmt: on
def test_failure(tmp_path, timing_map, n_tasks, retries):
@pytest.mark.parametrize("use_backups", [False, True])
def test_failure(tmp_path, timing_map, n_tasks, retries, use_backups):
with pytest.raises(RuntimeError):
asyncio.run(
run_test(
function=partial(deterministic_failure, tmp_path, timing_map),
input=range(n_tasks),
retries=retries,
use_backups=use_backups,
)
)

Expand Down
8 changes: 6 additions & 2 deletions cubed/tests/runtime/test_lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ def run_test(function, input, retries, timeout=10, use_backups=False):
],
)
# fmt: on
def test_success(tmp_path, timing_map, n_tasks, retries):
@pytest.mark.parametrize("use_backups", [False, True])
def test_success(tmp_path, timing_map, n_tasks, retries, use_backups):
outputs = run_test(
function=partial(deterministic_failure, tmp_path, timing_map),
input=range(n_tasks),
retries=retries,
use_backups=use_backups,
)

assert outputs == set(range(n_tasks))
Expand All @@ -65,12 +67,14 @@ def test_success(tmp_path, timing_map, n_tasks, retries):
],
)
# fmt: on
def test_failure(tmp_path, timing_map, n_tasks, retries):
@pytest.mark.parametrize("use_backups", [False, True])
def test_failure(tmp_path, timing_map, n_tasks, retries, use_backups):
with pytest.raises(RuntimeError):
run_test(
function=partial(deterministic_failure, tmp_path, timing_map),
input=range(n_tasks),
retries=retries,
use_backups=use_backups,
)

check_invocation_counts(tmp_path, timing_map, n_tasks, retries)
Expand Down
14 changes: 10 additions & 4 deletions cubed/tests/runtime/test_modal_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,17 @@ async def run_test(app_function, input, use_backups=False, batch_size=None, **kw
],
)
# fmt: on
@pytest.mark.parametrize("use_backups", [False, True])
@pytest.mark.cloud
def test_success(timing_map, n_tasks, retries):
def test_success(timing_map, n_tasks, retries, use_backups):
try:
outputs = asyncio.run(
run_test(
app_function=deterministic_failure_modal,
input=range(n_tasks),
use_backups=use_backups,
path=tmp_path,
timing_map=timing_map
timing_map=timing_map,
)
)

Expand All @@ -109,14 +111,16 @@ def test_success(timing_map, n_tasks, retries):
],
)
# fmt: on
@pytest.mark.parametrize("use_backups", [False, True])
@pytest.mark.cloud
def test_failure(timing_map, n_tasks, retries):
def test_failure(timing_map, n_tasks, retries, use_backups):
try:
with pytest.raises(RuntimeError):
asyncio.run(
run_test(
app_function=deterministic_failure_modal,
input=range(n_tasks),
use_backups=use_backups,
path=tmp_path,
timing_map=timing_map,
)
Expand All @@ -137,13 +141,15 @@ def test_failure(timing_map, n_tasks, retries):
],
)
# fmt: on
@pytest.mark.parametrize("use_backups", [False, True])
@pytest.mark.cloud
def test_large_number_of_tasks(timing_map, n_tasks, retries):
def test_large_number_of_tasks(timing_map, n_tasks, retries, use_backups):
try:
outputs = asyncio.run(
run_test(
app_function=deterministic_failure_modal,
input=range(n_tasks),
use_backups=use_backups,
path=tmp_path,
timing_map=timing_map
)
Expand Down
8 changes: 6 additions & 2 deletions cubed/tests/runtime/test_python_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ async def run_test(function, input, retries=2, use_backups=False, batch_size=Non
],
)
# fmt: on
def test_success(tmp_path, timing_map, n_tasks, retries):
@pytest.mark.parametrize("use_backups", [False, True])
def test_success(tmp_path, timing_map, n_tasks, retries, use_backups):
outputs = asyncio.run(
run_test(
function=partial(deterministic_failure, tmp_path, timing_map),
input=range(n_tasks),
retries=retries,
use_backups=use_backups,
)
)

Expand All @@ -62,13 +64,15 @@ def test_success(tmp_path, timing_map, n_tasks, retries):
],
)
# fmt: on
def test_failure(tmp_path, timing_map, n_tasks, retries):
@pytest.mark.parametrize("use_backups", [False, True])
def test_failure(tmp_path, timing_map, n_tasks, retries, use_backups):
with pytest.raises(RuntimeError):
asyncio.run(
run_test(
function=partial(deterministic_failure, tmp_path, timing_map),
input=range(n_tasks),
retries=retries,
use_backups=use_backups,
)
)

Expand Down
2 changes: 1 addition & 1 deletion docs/user-guide/reliability.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ A few slow running tasks (called stragglers) can disproportionately slow down th

When a backup task is launched the original task is not cancelled, so it is to be expected that both tasks will complete and write their (identical) output. This only works since tasks are idempotent and each write a single, whole Zarr chunk in an atomic operation. (Updates to a single key are atomic in both [Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html#ConsistencyModel) and Google Cloud Storage.)

Note that this feature is experimental and disabled by default since it has not been tested at scale yet.
Backup tasks are enabled by default, but if you need to turn them off you can do so with ``use_backups=False``.
3 changes: 2 additions & 1 deletion docs/user-guide/scaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ Weak scaling requires more workers than output chunks, so for large problems it
With fewer workers than chunks we would expect linear strong scaling, as every new worker added has nothing to wait for.

Stragglers are tasks that take much longer than average, who disproportionately hold up the next step of the computation.
To handle stragglers, you should consider turning on backups (with ``use_backups=True``), as any failures that are restarted essentially become stragglers.
Stargglers are handled by running backup tasks for any tasks that are running very slowly. This feature is enabled by default, but
if you need to turn it off you can do so with ``use_backups=False``.
Worker start-up time is another practical speed consideration, though it would delay computations of all scales equally.

### Multi-step Calculation
Expand Down
Loading