diff --git a/cubed/runtime/executors/asyncio.py b/cubed/runtime/executors/asyncio.py index 0ab20900..3e75b5ef 100644 --- a/cubed/runtime/executors/asyncio.py +++ b/cubed/runtime/executors/asyncio.py @@ -11,7 +11,7 @@ async def async_map_unordered( create_futures_func: Callable[..., List[Tuple[Any, Future]]], input: Iterable[Any], - use_backups: bool = False, + use_backups: bool = True, create_backup_futures_func: Optional[ Callable[..., List[Tuple[Any, Future]]] ] = None, diff --git a/cubed/runtime/executors/dask_distributed_async.py b/cubed/runtime/executors/dask_distributed_async.py index 8acd0834..5a452a16 100644 --- a/cubed/runtime/executors/dask_distributed_async.py +++ b/cubed/runtime/executors/dask_distributed_async.py @@ -36,7 +36,7 @@ async def map_unordered( map_function: Callable[..., Any], map_iterdata: Iterable[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]], retries: int = 2, - use_backups: bool = False, + use_backups: bool = True, batch_size: Optional[int] = None, return_stats: bool = False, name: Optional[str] = None, diff --git a/cubed/runtime/executors/lithops.py b/cubed/runtime/executors/lithops.py index 25d9b93a..1dfef6ad 100644 --- a/cubed/runtime/executors/lithops.py +++ b/cubed/runtime/executors/lithops.py @@ -48,7 +48,7 @@ def map_unordered( include_modules: List[str] = [], timeout: Optional[int] = None, retries: int = 2, - use_backups: bool = False, + use_backups: bool = True, return_stats: bool = False, **kwargs, ) -> Iterator[Any]: @@ -138,7 +138,7 @@ def map_unordered( future, now, start_times[group_name], end_times[group_name] ): input = future.input - logger.info("Running backup task for %s", input) + logger.warn("Running backup task for %s", input) futures = lithops_function_executor.map( group_name_to_function[group_name], [input], @@ -166,7 +166,7 @@ def execute_dag( compute_arrays_in_parallel: Optional[bool] = None, **kwargs, ) -> None: - use_backups = kwargs.pop("use_backups", False) + use_backups = kwargs.pop("use_backups", True) allowed_mem = spec.allowed_mem if spec is not None else None function_executor = FunctionExecutor(**kwargs) runtime_memory_mb = function_executor.config[function_executor.backend].get( diff --git a/cubed/runtime/executors/modal_async.py b/cubed/runtime/executors/modal_async.py index 91725336..711ece8b 100644 --- a/cubed/runtime/executors/modal_async.py +++ b/cubed/runtime/executors/modal_async.py @@ -26,7 +26,7 @@ async def map_unordered( app_function: Function, input: Iterable[Any], - use_backups: bool = False, + use_backups: bool = True, backup_function: Optional[Function] = None, batch_size: Optional[int] = None, return_stats: bool = False, diff --git a/cubed/runtime/executors/python_async.py b/cubed/runtime/executors/python_async.py index 01db8078..f0dc73db 100644 --- a/cubed/runtime/executors/python_async.py +++ b/cubed/runtime/executors/python_async.py @@ -27,7 +27,7 @@ async def map_unordered( function: Callable[..., Any], input: Iterable[Any], retries: int = 2, - use_backups: bool = False, + use_backups: bool = True, batch_size: Optional[int] = None, return_stats: bool = False, name: Optional[str] = None, diff --git a/cubed/tests/runtime/test_dask_distributed_async.py b/cubed/tests/runtime/test_dask_distributed_async.py index 64b07869..808fe91b 100644 --- a/cubed/tests/runtime/test_dask_distributed_async.py +++ b/cubed/tests/runtime/test_dask_distributed_async.py @@ -43,12 +43,14 @@ async def run_test(function, input, retries, use_backups=False, batch_size=None) ], ) # fmt: on -def test_success(tmp_path, timing_map, n_tasks, retries): +@pytest.mark.parametrize("use_backups", [False, True]) +def test_success(tmp_path, timing_map, n_tasks, retries, use_backups): outputs = asyncio.run( run_test( function=partial(deterministic_failure, tmp_path, timing_map), input=range(n_tasks), retries=retries, + use_backups=use_backups, ) ) @@ -66,13 +68,15 @@ def test_success(tmp_path, timing_map, n_tasks, retries): ], ) # fmt: on -def test_failure(tmp_path, timing_map, n_tasks, retries): +@pytest.mark.parametrize("use_backups", [False, True]) +def test_failure(tmp_path, timing_map, n_tasks, retries, use_backups): with pytest.raises(RuntimeError): asyncio.run( run_test( function=partial(deterministic_failure, tmp_path, timing_map), input=range(n_tasks), retries=retries, + use_backups=use_backups, ) ) diff --git a/cubed/tests/runtime/test_lithops.py b/cubed/tests/runtime/test_lithops.py index 55a04415..56838f06 100644 --- a/cubed/tests/runtime/test_lithops.py +++ b/cubed/tests/runtime/test_lithops.py @@ -44,11 +44,13 @@ def run_test(function, input, retries, timeout=10, use_backups=False): ], ) # fmt: on -def test_success(tmp_path, timing_map, n_tasks, retries): +@pytest.mark.parametrize("use_backups", [False, True]) +def test_success(tmp_path, timing_map, n_tasks, retries, use_backups): outputs = run_test( function=partial(deterministic_failure, tmp_path, timing_map), input=range(n_tasks), retries=retries, + use_backups=use_backups, ) assert outputs == set(range(n_tasks)) @@ -65,12 +67,14 @@ def test_success(tmp_path, timing_map, n_tasks, retries): ], ) # fmt: on -def test_failure(tmp_path, timing_map, n_tasks, retries): +@pytest.mark.parametrize("use_backups", [False, True]) +def test_failure(tmp_path, timing_map, n_tasks, retries, use_backups): with pytest.raises(RuntimeError): run_test( function=partial(deterministic_failure, tmp_path, timing_map), input=range(n_tasks), retries=retries, + use_backups=use_backups, ) check_invocation_counts(tmp_path, timing_map, n_tasks, retries) diff --git a/cubed/tests/runtime/test_modal_async.py b/cubed/tests/runtime/test_modal_async.py index 1904a3d2..88767b19 100644 --- a/cubed/tests/runtime/test_modal_async.py +++ b/cubed/tests/runtime/test_modal_async.py @@ -80,15 +80,17 @@ async def run_test(app_function, input, use_backups=False, batch_size=None, **kw ], ) # fmt: on +@pytest.mark.parametrize("use_backups", [False, True]) @pytest.mark.cloud -def test_success(timing_map, n_tasks, retries): +def test_success(timing_map, n_tasks, retries, use_backups): try: outputs = asyncio.run( run_test( app_function=deterministic_failure_modal, input=range(n_tasks), + use_backups=use_backups, path=tmp_path, - timing_map=timing_map + timing_map=timing_map, ) ) @@ -109,14 +111,16 @@ def test_success(timing_map, n_tasks, retries): ], ) # fmt: on +@pytest.mark.parametrize("use_backups", [False, True]) @pytest.mark.cloud -def test_failure(timing_map, n_tasks, retries): +def test_failure(timing_map, n_tasks, retries, use_backups): try: with pytest.raises(RuntimeError): asyncio.run( run_test( app_function=deterministic_failure_modal, input=range(n_tasks), + use_backups=use_backups, path=tmp_path, timing_map=timing_map, ) @@ -137,13 +141,15 @@ def test_failure(timing_map, n_tasks, retries): ], ) # fmt: on +@pytest.mark.parametrize("use_backups", [False, True]) @pytest.mark.cloud -def test_large_number_of_tasks(timing_map, n_tasks, retries): +def test_large_number_of_tasks(timing_map, n_tasks, retries, use_backups): try: outputs = asyncio.run( run_test( app_function=deterministic_failure_modal, input=range(n_tasks), + use_backups=use_backups, path=tmp_path, timing_map=timing_map ) diff --git a/cubed/tests/runtime/test_python_async.py b/cubed/tests/runtime/test_python_async.py index 3e27d797..efa9e296 100644 --- a/cubed/tests/runtime/test_python_async.py +++ b/cubed/tests/runtime/test_python_async.py @@ -40,12 +40,14 @@ async def run_test(function, input, retries=2, use_backups=False, batch_size=Non ], ) # fmt: on -def test_success(tmp_path, timing_map, n_tasks, retries): +@pytest.mark.parametrize("use_backups", [False, True]) +def test_success(tmp_path, timing_map, n_tasks, retries, use_backups): outputs = asyncio.run( run_test( function=partial(deterministic_failure, tmp_path, timing_map), input=range(n_tasks), retries=retries, + use_backups=use_backups, ) ) @@ -62,13 +64,15 @@ def test_success(tmp_path, timing_map, n_tasks, retries): ], ) # fmt: on -def test_failure(tmp_path, timing_map, n_tasks, retries): +@pytest.mark.parametrize("use_backups", [False, True]) +def test_failure(tmp_path, timing_map, n_tasks, retries, use_backups): with pytest.raises(RuntimeError): asyncio.run( run_test( function=partial(deterministic_failure, tmp_path, timing_map), input=range(n_tasks), retries=retries, + use_backups=use_backups, ) ) diff --git a/docs/user-guide/reliability.md b/docs/user-guide/reliability.md index 0e819d27..7030d27a 100644 --- a/docs/user-guide/reliability.md +++ b/docs/user-guide/reliability.md @@ -23,4 +23,4 @@ A few slow running tasks (called stragglers) can disproportionately slow down th When a backup task is launched the original task is not cancelled, so it is to be expected that both tasks will complete and write their (identical) output. This only works since tasks are idempotent and each write a single, whole Zarr chunk in an atomic operation. (Updates to a single key are atomic in both [Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html#ConsistencyModel) and Google Cloud Storage.) -Note that this feature is experimental and disabled by default since it has not been tested at scale yet. +Backup tasks are enabled by default, but if you need to turn them off you can do so with ``use_backups=False``. diff --git a/docs/user-guide/scaling.md b/docs/user-guide/scaling.md index 7d7db6a1..371e5b23 100644 --- a/docs/user-guide/scaling.md +++ b/docs/user-guide/scaling.md @@ -35,7 +35,8 @@ Weak scaling requires more workers than output chunks, so for large problems it With fewer workers than chunks we would expect linear strong scaling, as every new worker added has nothing to wait for. Stragglers are tasks that take much longer than average, who disproportionately hold up the next step of the computation. -To handle stragglers, you should consider turning on backups (with ``use_backups=True``), as any failures that are restarted essentially become stragglers. +Stargglers are handled by running backup tasks for any tasks that are running very slowly. This feature is enabled by default, but +if you need to turn it off you can do so with ``use_backups=False``. Worker start-up time is another practical speed consideration, though it would delay computations of all scales equally. ### Multi-step Calculation