Skip to content

Commit

Permalink
Add function to create executors by name and keyword arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Feb 22, 2024
1 parent 2a55838 commit 9221117
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 28 deletions.
44 changes: 44 additions & 0 deletions cubed/runtime/create.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import Optional

from cubed.runtime.types import Executor


def create_executor(name: str, executor_options: Optional[dict] = None) -> Executor:
"""Create an executor from an executor name."""
executor_options = executor_options or {}
if name == "beam":
from cubed.runtime.executors.beam import BeamDagExecutor

return BeamDagExecutor(**executor_options)
elif name == "coiled":
from cubed.runtime.executors.coiled import CoiledFunctionsDagExecutor

return CoiledFunctionsDagExecutor(**executor_options)
elif name == "dask":
from cubed.runtime.executors.dask_distributed_async import (
AsyncDaskDistributedExecutor,
)

return AsyncDaskDistributedExecutor(**executor_options)
elif name == "lithops":
from cubed.runtime.executors.lithops import LithopsDagExecutor

return LithopsDagExecutor(**executor_options)
elif name == "modal":
from cubed.runtime.executors.modal_async import AsyncModalDagExecutor

return AsyncModalDagExecutor(**executor_options)
elif name == "modal-sync":
from cubed.runtime.executors.modal import ModalDagExecutor

return ModalDagExecutor(**executor_options)
elif name == "single-threaded":
from cubed.runtime.executors.python import PythonDagExecutor

return PythonDagExecutor(**executor_options)
elif name == "threads":
from cubed.runtime.executors.python_async import AsyncPythonDagExecutor

return AsyncPythonDagExecutor(**executor_options)
else:
raise ValueError(f"Unrecognized executor name: {name}")
11 changes: 10 additions & 1 deletion cubed/spec.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Optional, Union

from cubed.runtime.create import create_executor
from cubed.runtime.types import Executor
from cubed.utils import convert_to_bytes

Expand All @@ -13,6 +14,8 @@ def __init__(
allowed_mem: Union[int, str, None] = None,
reserved_mem: Union[int, str, None] = 0,
executor: Union[Executor, None] = None,
executor_name: Optional[str] = None,
executor_options: Optional[dict] = None,
storage_options: Union[dict, None] = None,
):
"""
Expand Down Expand Up @@ -45,7 +48,13 @@ def __init__(
else:
self._allowed_mem = convert_to_bytes(allowed_mem)

self._executor = executor
if executor is not None:
self._executor = executor
elif executor_name is not None:
self._executor = create_executor(executor_name, executor_options)
else:
self._executor = None

self._storage_options = storage_options

@property
Expand Down
40 changes: 13 additions & 27 deletions cubed/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,60 +5,46 @@
import numpy as np
import zarr

from cubed.runtime.executors.python import PythonDagExecutor
from cubed.runtime.executors.python_async import AsyncPythonDagExecutor
from cubed.runtime.create import create_executor
from cubed.runtime.types import Callback

LITHOPS_LOCAL_CONFIG = {"lithops": {"backend": "localhost", "storage": "localhost"}}

ALL_EXECUTORS = [PythonDagExecutor()]
ALL_EXECUTORS = [create_executor("single-threaded")]

# don't run all tests on every executor as it's too slow, so just have a subset
MAIN_EXECUTORS = [PythonDagExecutor()]
MAIN_EXECUTORS = [create_executor("single-threaded")]


if platform.system() != "Windows":
# AsyncPythonDagExecutor calls `peak_measured_mem` which is not supported on Windows
ALL_EXECUTORS.append(AsyncPythonDagExecutor())
ALL_EXECUTORS.append(create_executor("threads"))


try:
from cubed.runtime.executors.beam import BeamDagExecutor

ALL_EXECUTORS.append(BeamDagExecutor())

MAIN_EXECUTORS.append(BeamDagExecutor())
ALL_EXECUTORS.append(create_executor("beam"))
MAIN_EXECUTORS.append(create_executor("beam"))
except ImportError:
pass

try:
from cubed.runtime.executors.dask_distributed_async import (
AsyncDaskDistributedExecutor,
)

ALL_EXECUTORS.append(AsyncDaskDistributedExecutor())

MAIN_EXECUTORS.append(AsyncDaskDistributedExecutor())
ALL_EXECUTORS.append(create_executor("dask"))
MAIN_EXECUTORS.append(create_executor("dask"))
except ImportError:
pass

try:
from cubed.runtime.executors.lithops import LithopsDagExecutor

ALL_EXECUTORS.append(LithopsDagExecutor(config=LITHOPS_LOCAL_CONFIG))

MAIN_EXECUTORS.append(LithopsDagExecutor(config=LITHOPS_LOCAL_CONFIG))
executor_options = dict(config=LITHOPS_LOCAL_CONFIG)
ALL_EXECUTORS.append(create_executor("lithops", executor_options))
MAIN_EXECUTORS.append(create_executor("lithops", executor_options))
except ImportError:
pass

MODAL_EXECUTORS = []

try:
from cubed.runtime.executors.modal import ModalDagExecutor
from cubed.runtime.executors.modal_async import AsyncModalDagExecutor

MODAL_EXECUTORS.append(AsyncModalDagExecutor())
MODAL_EXECUTORS.append(ModalDagExecutor())
MODAL_EXECUTORS.append(create_executor("modal"))
MODAL_EXECUTORS.append(create_executor("modal-sync"))
except ImportError:
pass

Expand Down

0 comments on commit 9221117

Please sign in to comment.