Skip to content

Commit

Permalink
Basic Ray execution backend (#2921)
Browse files Browse the repository at this point in the history
  • Loading branch information
fyrestone authored Apr 24, 2022
1 parent 11dc135 commit 9cfcc94
Show file tree
Hide file tree
Showing 30 changed files with 1,295 additions and 376 deletions.
15 changes: 15 additions & 0 deletions mars/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ def ray_start_regular_shared(request): # pragma: no cover
yield from _ray_start_regular(request)


@pytest.fixture(scope="module")
def ray_start_regular_shared2(request): # pragma: no cover
param = getattr(request, "param", {})
num_cpus = param.get("num_cpus", 64)
total_memory_mb = num_cpus * 2 * 1024**2
try:
try:
job_config = ray.job_config.JobConfig(total_memory_mb=total_memory_mb)
except TypeError:
job_config = None
yield ray.init(num_cpus=num_cpus, job_config=job_config)
finally:
ray.shutdown()


@pytest.fixture
def ray_start_regular(request): # pragma: no cover
yield from _ray_start_regular(request)
Expand Down
4 changes: 2 additions & 2 deletions mars/dataframe/contrib/raydataset/tests/test_mldataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async def test_convert_to_ray_mldataset(
ray_start_regular_shared, create_cluster, test_option
):
assert create_cluster.session
session = new_session(address=create_cluster.address, backend="oscar", default=True)
session = new_session(address=create_cluster.address, default=True)
with session:
value = np.random.rand(10, 10)
chunk_size, num_shards = test_option
Expand All @@ -95,7 +95,7 @@ async def test_mars_with_xgboost(ray_start_regular_shared, create_cluster):
from sklearn.datasets import load_breast_cancer

assert create_cluster.session
session = new_session(address=create_cluster.address, backend="oscar", default=True)
session = new_session(address=create_cluster.address, default=True)
with session:
train_x, train_y = load_breast_cancer(return_X_y=True, as_frame=True)
df: md.DataFrame = md.concat(
Expand Down
8 changes: 4 additions & 4 deletions mars/dataframe/contrib/raydataset/tests/test_raydataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async def test_convert_to_ray_dataset(
ray_start_regular_shared, create_cluster, test_option
):
assert create_cluster.session
session = new_session(address=create_cluster.address, backend="oscar", default=True)
session = new_session(address=create_cluster.address, default=True)
with session:
value = np.random.rand(10, 10)
chunk_size, num_shards = test_option
Expand All @@ -77,7 +77,7 @@ async def test_mars_with_xgboost(ray_start_regular_shared, create_cluster):
from sklearn.datasets import load_breast_cancer

assert create_cluster.session
session = new_session(address=create_cluster.address, backend="oscar", default=True)
session = new_session(address=create_cluster.address, default=True)
with session:
train_x, train_y = load_breast_cancer(return_X_y=True, as_frame=True)
pd_df = pd.concat([train_x, train_y], axis=1)
Expand Down Expand Up @@ -119,7 +119,7 @@ async def test_mars_with_xgboost_sklearn_clf(ray_start_regular_shared, create_cl
from sklearn.datasets import load_breast_cancer

assert create_cluster.session
session = new_session(address=create_cluster.address, backend="oscar", default=True)
session = new_session(address=create_cluster.address, default=True)
with session:
train_x, train_y = load_breast_cancer(return_X_y=True, as_frame=True)
df: md.DataFrame = md.concat(
Expand Down Expand Up @@ -161,7 +161,7 @@ async def test_mars_with_xgboost_sklearn_reg(ray_start_regular_shared, create_cl
from sklearn.datasets import make_regression

assert create_cluster.session
session = new_session(address=create_cluster.address, backend="oscar", default=True)
session = new_session(address=create_cluster.address, default=True)
with session:
np_X, np_y = make_regression(n_samples=1_0000, n_features=10)
X, y = md.DataFrame(np_X), md.DataFrame({"target": np_y})
Expand Down
25 changes: 19 additions & 6 deletions mars/deploy/oscar/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ async def new_cluster_in_isolation(
timeout: float = None,
n_supervisor_process: int = 0,
) -> ClientType:
if subprocess_start_method is None:
subprocess_start_method = "spawn" if sys.platform == "win32" else "forkserver"
cluster = LocalCluster(
address,
n_worker,
Expand Down Expand Up @@ -125,11 +123,15 @@ def __init__(
subprocess_start_method: str = None,
config: Union[str, Dict] = None,
web: Union[bool, str] = "auto",
timeout: float = None,
n_supervisor_process: int = 0,
):
# load third party extensions.
init_extension_entrypoints()
# auto choose the subprocess_start_method.
if subprocess_start_method is None:
subprocess_start_method = (
"spawn" if sys.platform == "win32" else "forkserver"
)
# load config file to dict.
if not config or isinstance(config, str):
config = load_config(config)
Expand Down Expand Up @@ -268,11 +270,22 @@ def __init__(self: ClientType, cluster: ClusterType, session: AbstractSession):

@classmethod
async def create(
cls, cluster: LocalCluster, backend: str = None, timeout: float = None
cls,
cluster: LocalCluster,
backend: str = None,
timeout: float = None,
) -> ClientType:
backend = backend or "oscar"
if backend is None:
backend = (
cluster._config.get("task", {})
.get("task_executor_config", {})
.get("backend", "mars")
)
session = await _new_session(
cluster.external_address, backend=backend, default=True, timeout=timeout
cluster.external_address,
backend=backend,
default=True,
timeout=timeout,
)
client = LocalClient(cluster, session)
session.client = client
Expand Down
2 changes: 1 addition & 1 deletion mars/deploy/oscar/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ def new_ray_session(
session_id = session_id or client.session.session_id
address = client.address
session = new_session(
address=address, session_id=session_id, backend="oscar", default=default
address=address, session_id=session_id, backend="mars", default=default
)
session._ray_client = client
if default:
Expand Down
91 changes: 45 additions & 46 deletions mars/deploy/oscar/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from numbers import Integral
from urllib.parse import urlparse
from weakref import ref, WeakKeyDictionary, WeakSet
from typing import Any, Callable, Coroutine, Dict, List, Optional, Tuple, Type, Union
from typing import Any, Callable, Coroutine, Dict, List, Optional, Tuple, Union

import numpy as np

Expand All @@ -54,6 +54,7 @@
from ...services.mutable import MutableAPI, MutableTensor
from ...services.storage import StorageAPI
from ...services.task import AbstractTaskAPI, TaskAPI, TaskResult
from ...services.task.execution.api import Fetcher
from ...services.web import OscarWebAPI
from ...tensor.utils import slice_split
from ...typing import ClientType, BandType
Expand Down Expand Up @@ -441,7 +442,7 @@ def init(
cls,
address: str,
session_id: str,
backend: str = "oscar",
backend: str = "mars",
new: bool = True,
**kwargs,
) -> "AbstractSession":
Expand Down Expand Up @@ -658,14 +659,6 @@ def fetch_log(
return fetch(tileables, self, offsets=offsets, sizes=sizes)


_type_name_to_session_cls: Dict[str, Type[AbstractAsyncSession]] = dict()


def register_session_cls(session_cls: Type[AbstractAsyncSession]):
_type_name_to_session_cls[session_cls.name] = session_cls
return session_cls


@dataclass
class ChunkFetchInfo:
tileable: TileableType
Expand Down Expand Up @@ -755,14 +748,12 @@ def gen_submit_tileable_graph(
return graph, to_execute_tileables


@register_session_cls
class _IsolatedSession(AbstractAsyncSession):
name = "oscar"

def __init__(
self,
address: str,
session_id: str,
backend: str,
session_api: AbstractSessionAPI,
meta_api: AbstractMetaAPI,
lifecycle_api: AbstractLifecycleAPI,
Expand All @@ -775,6 +766,7 @@ def __init__(
request_rewriter: Callable = None,
):
super().__init__(address, session_id)
self._backend = backend
self._session_api = session_api
self._task_api = task_api
self._meta_api = meta_api
Expand All @@ -800,7 +792,12 @@ def __init__(

@classmethod
async def _init(
cls, address: str, session_id: str, new: bool = True, timeout: float = None
cls,
address: str,
session_id: str,
backend: str,
new: bool = True,
timeout: float = None,
):
session_api = await SessionAPI.create(address)
if new:
Expand All @@ -820,6 +817,7 @@ async def _init(
return cls(
address,
session_id,
backend,
session_api,
meta_api,
lifecycle_api,
Expand All @@ -836,6 +834,7 @@ async def init(
cls,
address: str,
session_id: str,
backend: str,
new: bool = True,
timeout: float = None,
**kwargs,
Expand All @@ -859,12 +858,19 @@ async def init(
return await _IsolatedWebSession._init(
address,
session_id,
backend,
new=new,
timeout=timeout,
request_rewriter=request_rewriter,
)
else:
return await cls._init(address, session_id, new=new, timeout=timeout)
return await cls._init(
address,
session_id,
backend,
new=new,
timeout=timeout,
)

async def _update_progress(self, task_id: str, progress: Progress):
zero_acc_time = 0
Expand Down Expand Up @@ -1084,6 +1090,8 @@ async def fetch(self, *tileables, **kwargs) -> list:
unexpected_keys = ", ".join(list(kwargs.keys()))
raise TypeError(f"`fetch` got unexpected arguments: {unexpected_keys}")

fetcher = Fetcher.create(self._backend, get_storage_api=self._get_storage_api)

with enter_mode(build=True):
chunks = []
get_chunk_metas = []
Expand All @@ -1099,7 +1107,10 @@ async def fetch(self, *tileables, **kwargs) -> list:
continue
chunks.append(chunk)
get_chunk_metas.append(
self._meta_api.get_chunk_meta.delay(chunk.key, fields=["bands"])
self._meta_api.get_chunk_meta.delay(
chunk.key,
fields=fetcher.required_meta_keys,
)
)
indexes = (
chunk_to_slice[chunk] if chunk_to_slice is not None else None
Expand All @@ -1108,29 +1119,17 @@ async def fetch(self, *tileables, **kwargs) -> list:
ChunkFetchInfo(tileable=tileable, chunk=chunk, indexes=indexes)
)
fetch_infos_list.append(fetch_infos)
chunk_metas = await self._meta_api.get_chunk_meta.batch(*get_chunk_metas)
chunk_to_band = {
chunk: meta["bands"][0] for chunk, meta in zip(chunks, chunk_metas)
}

storage_api_to_gets = defaultdict(list)
storage_api_to_fetch_infos = defaultdict(list)
for fetch_info in itertools.chain(*fetch_infos_list):
conditions = fetch_info.indexes
chunk = fetch_info.chunk
band = chunk_to_band[chunk]
storage_api = await self._get_storage_api(band)
storage_api_to_gets[storage_api].append(
storage_api.get.delay(chunk.key, conditions=conditions)
)
storage_api_to_fetch_infos[storage_api].append(fetch_info)
for storage_api in storage_api_to_gets:
fetched_data = await storage_api.get.batch(
*storage_api_to_gets[storage_api]
)
infos = storage_api_to_fetch_infos[storage_api]
for info, data in zip(infos, fetched_data):
info.data = data
chunk_metas = await self._meta_api.get_chunk_meta.batch(*get_chunk_metas)
for chunk, meta, fetch_info in zip(
chunks, chunk_metas, itertools.chain(*fetch_infos_list)
):
await fetcher.append(chunk.key, meta, fetch_info.indexes)
fetched_data = await fetcher.get()
for fetch_info, data in zip(
itertools.chain(*fetch_infos_list), fetched_data
):
fetch_info.data = data

result = []
for tileable, fetch_infos in zip(tileables, fetch_infos_list):
Expand Down Expand Up @@ -1317,6 +1316,7 @@ async def _init(
cls,
address: str,
session_id: str,
backend: str,
new: bool = True,
timeout: float = None,
request_rewriter: Callable = None,
Expand All @@ -1341,6 +1341,7 @@ async def _init(
return cls(
address,
session_id,
backend,
session_api,
meta_api,
lifecycle_api,
Expand Down Expand Up @@ -1415,13 +1416,12 @@ async def init(
cls,
address: str,
session_id: str,
backend: str = "oscar",
backend: str = "mars",
new: bool = True,
**kwargs,
) -> "AbstractSession":
session_cls = _type_name_to_session_cls[backend]
isolation = ensure_isolation_created(kwargs)
coro = session_cls.init(address, session_id, new=new, **kwargs)
coro = _IsolatedSession.init(address, session_id, backend, new=new, **kwargs)
fut = asyncio.run_coroutine_threadsafe(coro, isolation.loop)
isolated_session = await asyncio.wrap_future(fut)
return AsyncSession(address, session_id, isolated_session, isolation)
Expand Down Expand Up @@ -1587,13 +1587,12 @@ def init(
cls,
address: str,
session_id: str,
backend: str = "oscar",
backend: str = "mars",
new: bool = True,
**kwargs,
) -> "AbstractSession":
session_cls = _type_name_to_session_cls[backend]
isolation = ensure_isolation_created(kwargs)
coro = session_cls.init(address, session_id, new=new, **kwargs)
coro = _IsolatedSession.init(address, session_id, backend, new=new, **kwargs)
fut = asyncio.run_coroutine_threadsafe(coro, isolation.loop)
isolated_session = fut.result()
return SyncSession(address, session_id, isolated_session, isolation)
Expand Down Expand Up @@ -1963,7 +1962,7 @@ def _new_session_id():
async def _new_session(
address: str,
session_id: str = None,
backend: str = "oscar",
backend: str = "mars",
default: bool = False,
**kwargs,
) -> AbstractSession:
Expand All @@ -1981,7 +1980,7 @@ async def _new_session(
def new_session(
address: str = None,
session_id: str = None,
backend: str = "oscar",
backend: str = "mars",
default: bool = True,
new: bool = True,
**kwargs,
Expand Down
10 changes: 10 additions & 0 deletions mars/deploy/oscar/tests/local_test_with_ray_dag_config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"@inherits": '@default'
session:
custom_log_dir: auto
plasma:
store_memory: 32M
scheduling:
mem_hard_limit: 0
task:
task_executor_config:
"backend": "ray"
Loading

0 comments on commit 9cfcc94

Please sign in to comment.