From 9221117357b72cef4bf1331a3079c1ef221f80c5 Mon Sep 17 00:00:00 2001 From: Tom White Date: Wed, 21 Feb 2024 15:48:50 +0000 Subject: [PATCH] Add function to create executors by name and keyword arguments --- cubed/runtime/create.py | 44 +++++++++++++++++++++++++++++++++++++++++ cubed/spec.py | 11 ++++++++++- cubed/tests/utils.py | 40 ++++++++++++------------------------- 3 files changed, 67 insertions(+), 28 deletions(-) create mode 100644 cubed/runtime/create.py diff --git a/cubed/runtime/create.py b/cubed/runtime/create.py new file mode 100644 index 00000000..0db4b5b1 --- /dev/null +++ b/cubed/runtime/create.py @@ -0,0 +1,44 @@ +from typing import Optional + +from cubed.runtime.types import Executor + + +def create_executor(name: str, executor_options: Optional[dict] = None) -> Executor: + """Create an executor from an executor name.""" + executor_options = executor_options or {} + if name == "beam": + from cubed.runtime.executors.beam import BeamDagExecutor + + return BeamDagExecutor(**executor_options) + elif name == "coiled": + from cubed.runtime.executors.coiled import CoiledFunctionsDagExecutor + + return CoiledFunctionsDagExecutor(**executor_options) + elif name == "dask": + from cubed.runtime.executors.dask_distributed_async import ( + AsyncDaskDistributedExecutor, + ) + + return AsyncDaskDistributedExecutor(**executor_options) + elif name == "lithops": + from cubed.runtime.executors.lithops import LithopsDagExecutor + + return LithopsDagExecutor(**executor_options) + elif name == "modal": + from cubed.runtime.executors.modal_async import AsyncModalDagExecutor + + return AsyncModalDagExecutor(**executor_options) + elif name == "modal-sync": + from cubed.runtime.executors.modal import ModalDagExecutor + + return ModalDagExecutor(**executor_options) + elif name == "single-threaded": + from cubed.runtime.executors.python import PythonDagExecutor + + return PythonDagExecutor(**executor_options) + elif name == "threads": + from cubed.runtime.executors.python_async import AsyncPythonDagExecutor + + return AsyncPythonDagExecutor(**executor_options) + else: + raise ValueError(f"Unrecognized executor name: {name}") diff --git a/cubed/spec.py b/cubed/spec.py index 3f11994f..372b2e15 100644 --- a/cubed/spec.py +++ b/cubed/spec.py @@ -1,5 +1,6 @@ from typing import Optional, Union +from cubed.runtime.create import create_executor from cubed.runtime.types import Executor from cubed.utils import convert_to_bytes @@ -13,6 +14,8 @@ def __init__( allowed_mem: Union[int, str, None] = None, reserved_mem: Union[int, str, None] = 0, executor: Union[Executor, None] = None, + executor_name: Optional[str] = None, + executor_options: Optional[dict] = None, storage_options: Union[dict, None] = None, ): """ @@ -45,7 +48,13 @@ def __init__( else: self._allowed_mem = convert_to_bytes(allowed_mem) - self._executor = executor + if executor is not None: + self._executor = executor + elif executor_name is not None: + self._executor = create_executor(executor_name, executor_options) + else: + self._executor = None + self._storage_options = storage_options @property diff --git a/cubed/tests/utils.py b/cubed/tests/utils.py index 1115e662..ab3ea264 100644 --- a/cubed/tests/utils.py +++ b/cubed/tests/utils.py @@ -5,60 +5,46 @@ import numpy as np import zarr -from cubed.runtime.executors.python import PythonDagExecutor -from cubed.runtime.executors.python_async import AsyncPythonDagExecutor +from cubed.runtime.create import create_executor from cubed.runtime.types import Callback LITHOPS_LOCAL_CONFIG = {"lithops": {"backend": "localhost", "storage": "localhost"}} -ALL_EXECUTORS = [PythonDagExecutor()] +ALL_EXECUTORS = [create_executor("single-threaded")] # don't run all tests on every executor as it's too slow, so just have a subset -MAIN_EXECUTORS = [PythonDagExecutor()] +MAIN_EXECUTORS = [create_executor("single-threaded")] if platform.system() != "Windows": # AsyncPythonDagExecutor calls `peak_measured_mem` which is not supported on Windows - ALL_EXECUTORS.append(AsyncPythonDagExecutor()) + ALL_EXECUTORS.append(create_executor("threads")) try: - from cubed.runtime.executors.beam import BeamDagExecutor - - ALL_EXECUTORS.append(BeamDagExecutor()) - - MAIN_EXECUTORS.append(BeamDagExecutor()) + ALL_EXECUTORS.append(create_executor("beam")) + MAIN_EXECUTORS.append(create_executor("beam")) except ImportError: pass try: - from cubed.runtime.executors.dask_distributed_async import ( - AsyncDaskDistributedExecutor, - ) - - ALL_EXECUTORS.append(AsyncDaskDistributedExecutor()) - - MAIN_EXECUTORS.append(AsyncDaskDistributedExecutor()) + ALL_EXECUTORS.append(create_executor("dask")) + MAIN_EXECUTORS.append(create_executor("dask")) except ImportError: pass try: - from cubed.runtime.executors.lithops import LithopsDagExecutor - - ALL_EXECUTORS.append(LithopsDagExecutor(config=LITHOPS_LOCAL_CONFIG)) - - MAIN_EXECUTORS.append(LithopsDagExecutor(config=LITHOPS_LOCAL_CONFIG)) + executor_options = dict(config=LITHOPS_LOCAL_CONFIG) + ALL_EXECUTORS.append(create_executor("lithops", executor_options)) + MAIN_EXECUTORS.append(create_executor("lithops", executor_options)) except ImportError: pass MODAL_EXECUTORS = [] try: - from cubed.runtime.executors.modal import ModalDagExecutor - from cubed.runtime.executors.modal_async import AsyncModalDagExecutor - - MODAL_EXECUTORS.append(AsyncModalDagExecutor()) - MODAL_EXECUTORS.append(ModalDagExecutor()) + MODAL_EXECUTORS.append(create_executor("modal")) + MODAL_EXECUTORS.append(create_executor("modal-sync")) except ImportError: pass