Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic Ray execution backend #2921

Merged
merged 40 commits into from
Apr 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
245f49d
Basic ray execution backend
Apr 11, 2022
f250ee3
Fix lint
Apr 16, 2022
b20748d
Merge remote-tracking branch 'upstream/master' into basic_ray_executi…
Apr 18, 2022
8fbab7c
Refine ExecutionAPI to support meta updating optimization #2912
Apr 18, 2022
af2c436
Merge remote-tracking branch 'upstream/master' into basic_ray_executi…
Apr 18, 2022
d1a448b
Fix
Apr 19, 2022
63e3e67
Fix
Apr 19, 2022
0d54f8c
Fix
Apr 19, 2022
d92f380
Merge remote-tracking branch 'upstream/master' into basic_ray_executi…
Apr 19, 2022
4396c14
Refine code
Apr 19, 2022
c5e78a6
Fix no bands info for ray execution backend
Apr 19, 2022
5991ef3
Fix
Apr 20, 2022
6e10497
Fix
Apr 20, 2022
6a53b23
Pin xgboost<1.6.0 to avoid breaking API
Apr 20, 2022
0282838
Fix
Apr 20, 2022
9e1d91a
Remove debug code
Apr 20, 2022
a4193a4
Merge remote-tracking branch 'upstream/master' into basic_ray_executi…
Apr 20, 2022
451492b
Ensure coverage for ray execution backend.
Apr 20, 2022
f93e243
Fix
Apr 20, 2022
8d4db20
Refine Fetcher
Apr 20, 2022
5bebe14
Fix
Apr 20, 2022
41b464a
Fix
Apr 21, 2022
0411cd9
Refine session backend
Apr 21, 2022
3573204
Fix
Apr 21, 2022
d29ace9
Fix
Apr 21, 2022
4a101e7
Fix lint
Apr 21, 2022
704e036
Fix
Apr 21, 2022
d986d6e
Merge remote-tracking branch 'upstream/master' into basic_ray_executi…
Apr 22, 2022
b7f2c27
Debug CI
Apr 22, 2022
60a221e
Revert "Debug CI"
Apr 22, 2022
5717858
Improve test session
Apr 22, 2022
2247695
Debug CI
Apr 22, 2022
6a1f0ff
Fix lint
Apr 22, 2022
2bb9d14
Debug CI2
Apr 22, 2022
edf6616
Try to Fix CI
Apr 22, 2022
e4b2587
Revert "Debug CI2"
Apr 22, 2022
6c79007
Revert "Debug CI"
Apr 22, 2022
091f4e0
Fix CI
Apr 23, 2022
f5e8d44
Improve coverage
Apr 23, 2022
76316da
Fix CI
Apr 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions mars/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ def ray_start_regular_shared(request): # pragma: no cover
yield from _ray_start_regular(request)


@pytest.fixture(scope="module")
def ray_start_regular_shared2(request): # pragma: no cover
param = getattr(request, "param", {})
num_cpus = param.get("num_cpus", 64)
total_memory_mb = num_cpus * 2 * 1024**2
try:
try:
job_config = ray.job_config.JobConfig(total_memory_mb=total_memory_mb)
except TypeError:
job_config = None
yield ray.init(num_cpus=num_cpus, job_config=job_config)
finally:
ray.shutdown()


@pytest.fixture
def ray_start_regular(request): # pragma: no cover
yield from _ray_start_regular(request)
Expand Down
4 changes: 2 additions & 2 deletions mars/dataframe/contrib/raydataset/tests/test_mldataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async def test_convert_to_ray_mldataset(
ray_start_regular_shared, create_cluster, test_option
):
assert create_cluster.session
session = new_session(address=create_cluster.address, backend="oscar", default=True)
session = new_session(address=create_cluster.address, default=True)
with session:
value = np.random.rand(10, 10)
chunk_size, num_shards = test_option
Expand All @@ -95,7 +95,7 @@ async def test_mars_with_xgboost(ray_start_regular_shared, create_cluster):
from sklearn.datasets import load_breast_cancer

assert create_cluster.session
session = new_session(address=create_cluster.address, backend="oscar", default=True)
session = new_session(address=create_cluster.address, default=True)
with session:
train_x, train_y = load_breast_cancer(return_X_y=True, as_frame=True)
df: md.DataFrame = md.concat(
Expand Down
8 changes: 4 additions & 4 deletions mars/dataframe/contrib/raydataset/tests/test_raydataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async def test_convert_to_ray_dataset(
ray_start_regular_shared, create_cluster, test_option
):
assert create_cluster.session
session = new_session(address=create_cluster.address, backend="oscar", default=True)
session = new_session(address=create_cluster.address, default=True)
with session:
value = np.random.rand(10, 10)
chunk_size, num_shards = test_option
Expand All @@ -77,7 +77,7 @@ async def test_mars_with_xgboost(ray_start_regular_shared, create_cluster):
from sklearn.datasets import load_breast_cancer

assert create_cluster.session
session = new_session(address=create_cluster.address, backend="oscar", default=True)
session = new_session(address=create_cluster.address, default=True)
with session:
train_x, train_y = load_breast_cancer(return_X_y=True, as_frame=True)
pd_df = pd.concat([train_x, train_y], axis=1)
Expand Down Expand Up @@ -119,7 +119,7 @@ async def test_mars_with_xgboost_sklearn_clf(ray_start_regular_shared, create_cl
from sklearn.datasets import load_breast_cancer

assert create_cluster.session
session = new_session(address=create_cluster.address, backend="oscar", default=True)
session = new_session(address=create_cluster.address, default=True)
with session:
train_x, train_y = load_breast_cancer(return_X_y=True, as_frame=True)
df: md.DataFrame = md.concat(
Expand Down Expand Up @@ -161,7 +161,7 @@ async def test_mars_with_xgboost_sklearn_reg(ray_start_regular_shared, create_cl
from sklearn.datasets import make_regression

assert create_cluster.session
session = new_session(address=create_cluster.address, backend="oscar", default=True)
session = new_session(address=create_cluster.address, default=True)
with session:
np_X, np_y = make_regression(n_samples=1_0000, n_features=10)
X, y = md.DataFrame(np_X), md.DataFrame({"target": np_y})
Expand Down
25 changes: 19 additions & 6 deletions mars/deploy/oscar/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ async def new_cluster_in_isolation(
timeout: float = None,
n_supervisor_process: int = 0,
) -> ClientType:
if subprocess_start_method is None:
subprocess_start_method = "spawn" if sys.platform == "win32" else "forkserver"
cluster = LocalCluster(
address,
n_worker,
Expand Down Expand Up @@ -125,11 +123,15 @@ def __init__(
subprocess_start_method: str = None,
config: Union[str, Dict] = None,
web: Union[bool, str] = "auto",
timeout: float = None,
n_supervisor_process: int = 0,
):
# load third party extensions.
init_extension_entrypoints()
# auto choose the subprocess_start_method.
if subprocess_start_method is None:
subprocess_start_method = (
"spawn" if sys.platform == "win32" else "forkserver"
)
# load config file to dict.
if not config or isinstance(config, str):
config = load_config(config)
Expand Down Expand Up @@ -268,11 +270,22 @@ def __init__(self: ClientType, cluster: ClusterType, session: AbstractSession):

@classmethod
async def create(
cls, cluster: LocalCluster, backend: str = None, timeout: float = None
cls,
cluster: LocalCluster,
backend: str = None,
timeout: float = None,
) -> ClientType:
backend = backend or "oscar"
if backend is None:
backend = (
cluster._config.get("task", {})
.get("task_executor_config", {})
.get("backend", "mars")
)
session = await _new_session(
cluster.external_address, backend=backend, default=True, timeout=timeout
cluster.external_address,
backend=backend,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it's weird to see the backend here actually takes no effect, both Mars & ray pass the backend with value oscar, I wonder if we can unify the backend and execution_backend?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, We can resuse the backend for ray execution backend? e.g.

  • backend == 'oscar' for Mars & Mars on Ray
  • backend == 'ray' for Mars on Ray DAG

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just name mars for mars, ray for mars on ray?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, the backend is bound to the session cls. The execution backend only for the execution implementation. I am not sure if it is a good idea to mix them to one.

Currenty,

  • backend
    • oscar - _IsolatedSession
    • test - CheckedSession
  • execution backend
    • mars - Mars, Mars on Ray
    • ray - Mars on Ray DAG

I can mix them to one,

  • backend
    • mars - Mars, Mars on Ray
    • ray - Mars on Ray DAG

The backend will never bounds to the session cls.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's ok, backend for different backend not for session cls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will push a commit to fix this. Thanks.

default=True,
timeout=timeout,
fyrestone marked this conversation as resolved.
Show resolved Hide resolved
)
client = LocalClient(cluster, session)
session.client = client
Expand Down
2 changes: 1 addition & 1 deletion mars/deploy/oscar/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ def new_ray_session(
session_id = session_id or client.session.session_id
address = client.address
session = new_session(
address=address, session_id=session_id, backend="oscar", default=default
address=address, session_id=session_id, backend="mars", default=default
)
session._ray_client = client
if default:
Expand Down
91 changes: 45 additions & 46 deletions mars/deploy/oscar/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from numbers import Integral
from urllib.parse import urlparse
from weakref import ref, WeakKeyDictionary, WeakSet
from typing import Any, Callable, Coroutine, Dict, List, Optional, Tuple, Type, Union
from typing import Any, Callable, Coroutine, Dict, List, Optional, Tuple, Union

import numpy as np

Expand All @@ -54,6 +54,7 @@
from ...services.mutable import MutableAPI, MutableTensor
from ...services.storage import StorageAPI
from ...services.task import AbstractTaskAPI, TaskAPI, TaskResult
from ...services.task.execution.api import Fetcher
from ...services.web import OscarWebAPI
from ...tensor.utils import slice_split
from ...typing import ClientType, BandType
Expand Down Expand Up @@ -442,7 +443,7 @@ def init(
cls,
address: str,
session_id: str,
backend: str = "oscar",
backend: str = "mars",
new: bool = True,
**kwargs,
) -> "AbstractSession":
Expand Down Expand Up @@ -659,14 +660,6 @@ def fetch_log(
return fetch(tileables, self, offsets=offsets, sizes=sizes)


_type_name_to_session_cls: Dict[str, Type[AbstractAsyncSession]] = dict()


def register_session_cls(session_cls: Type[AbstractAsyncSession]):
_type_name_to_session_cls[session_cls.name] = session_cls
return session_cls


@dataclass
class ChunkFetchInfo:
tileable: TileableType
Expand Down Expand Up @@ -756,14 +749,12 @@ def gen_submit_tileable_graph(
return graph, to_execute_tileables


@register_session_cls
class _IsolatedSession(AbstractAsyncSession):
name = "oscar"

def __init__(
self,
address: str,
session_id: str,
backend: str,
session_api: AbstractSessionAPI,
meta_api: AbstractMetaAPI,
lifecycle_api: AbstractLifecycleAPI,
Expand All @@ -776,6 +767,7 @@ def __init__(
request_rewriter: Callable = None,
):
super().__init__(address, session_id)
self._backend = backend
self._session_api = session_api
self._task_api = task_api
self._meta_api = meta_api
Expand All @@ -801,7 +793,12 @@ def __init__(

@classmethod
async def _init(
cls, address: str, session_id: str, new: bool = True, timeout: float = None
cls,
address: str,
session_id: str,
backend: str,
new: bool = True,
timeout: float = None,
):
session_api = await SessionAPI.create(address)
if new:
Expand All @@ -821,6 +818,7 @@ async def _init(
return cls(
address,
session_id,
backend,
session_api,
meta_api,
lifecycle_api,
Expand All @@ -837,6 +835,7 @@ async def init(
cls,
address: str,
session_id: str,
backend: str,
new: bool = True,
timeout: float = None,
**kwargs,
Expand All @@ -860,12 +859,19 @@ async def init(
return await _IsolatedWebSession._init(
address,
session_id,
backend,
new=new,
timeout=timeout,
request_rewriter=request_rewriter,
)
else:
return await cls._init(address, session_id, new=new, timeout=timeout)
return await cls._init(
address,
session_id,
backend,
new=new,
timeout=timeout,
)

async def _update_progress(self, task_id: str, progress: Progress):
zero_acc_time = 0
Expand Down Expand Up @@ -1085,6 +1091,8 @@ async def fetch(self, *tileables, **kwargs) -> list:
unexpected_keys = ", ".join(list(kwargs.keys()))
raise TypeError(f"`fetch` got unexpected arguments: {unexpected_keys}")

fetcher = Fetcher.create(self._backend, get_storage_api=self._get_storage_api)
hekaisheng marked this conversation as resolved.
Show resolved Hide resolved

with enter_mode(build=True):
chunks = []
get_chunk_metas = []
Expand All @@ -1100,7 +1108,10 @@ async def fetch(self, *tileables, **kwargs) -> list:
continue
chunks.append(chunk)
get_chunk_metas.append(
self._meta_api.get_chunk_meta.delay(chunk.key, fields=["bands"])
self._meta_api.get_chunk_meta.delay(
chunk.key,
fields=fetcher.required_meta_keys,
)
)
indexes = (
chunk_to_slice[chunk] if chunk_to_slice is not None else None
Expand All @@ -1109,29 +1120,17 @@ async def fetch(self, *tileables, **kwargs) -> list:
ChunkFetchInfo(tileable=tileable, chunk=chunk, indexes=indexes)
)
fetch_infos_list.append(fetch_infos)
chunk_metas = await self._meta_api.get_chunk_meta.batch(*get_chunk_metas)
chunk_to_band = {
chunk: meta["bands"][0] for chunk, meta in zip(chunks, chunk_metas)
}

storage_api_to_gets = defaultdict(list)
storage_api_to_fetch_infos = defaultdict(list)
for fetch_info in itertools.chain(*fetch_infos_list):
conditions = fetch_info.indexes
chunk = fetch_info.chunk
band = chunk_to_band[chunk]
storage_api = await self._get_storage_api(band)
storage_api_to_gets[storage_api].append(
storage_api.get.delay(chunk.key, conditions=conditions)
)
storage_api_to_fetch_infos[storage_api].append(fetch_info)
for storage_api in storage_api_to_gets:
fetched_data = await storage_api.get.batch(
*storage_api_to_gets[storage_api]
)
infos = storage_api_to_fetch_infos[storage_api]
for info, data in zip(infos, fetched_data):
info.data = data
chunk_metas = await self._meta_api.get_chunk_meta.batch(*get_chunk_metas)
for chunk, meta, fetch_info in zip(
chunks, chunk_metas, itertools.chain(*fetch_infos_list)
):
await fetcher.append(chunk.key, meta, fetch_info.indexes)
fetched_data = await fetcher.get()
for fetch_info, data in zip(
itertools.chain(*fetch_infos_list), fetched_data
):
fetch_info.data = data

result = []
for tileable, fetch_infos in zip(tileables, fetch_infos_list):
Expand Down Expand Up @@ -1318,6 +1317,7 @@ async def _init(
cls,
address: str,
session_id: str,
backend: str,
new: bool = True,
timeout: float = None,
request_rewriter: Callable = None,
Expand All @@ -1342,6 +1342,7 @@ async def _init(
return cls(
address,
session_id,
backend,
session_api,
meta_api,
lifecycle_api,
Expand Down Expand Up @@ -1416,13 +1417,12 @@ async def init(
cls,
address: str,
session_id: str,
backend: str = "oscar",
backend: str = "mars",
new: bool = True,
**kwargs,
) -> "AbstractSession":
session_cls = _type_name_to_session_cls[backend]
isolation = ensure_isolation_created(kwargs)
coro = session_cls.init(address, session_id, new=new, **kwargs)
coro = _IsolatedSession.init(address, session_id, backend, new=new, **kwargs)
fut = asyncio.run_coroutine_threadsafe(coro, isolation.loop)
isolated_session = await asyncio.wrap_future(fut)
return AsyncSession(address, session_id, isolated_session, isolation)
Expand Down Expand Up @@ -1588,13 +1588,12 @@ def init(
cls,
address: str,
session_id: str,
backend: str = "oscar",
backend: str = "mars",
new: bool = True,
**kwargs,
) -> "AbstractSession":
session_cls = _type_name_to_session_cls[backend]
isolation = ensure_isolation_created(kwargs)
coro = session_cls.init(address, session_id, new=new, **kwargs)
coro = _IsolatedSession.init(address, session_id, backend, new=new, **kwargs)
fut = asyncio.run_coroutine_threadsafe(coro, isolation.loop)
isolated_session = fut.result()
return SyncSession(address, session_id, isolated_session, isolation)
Expand Down Expand Up @@ -1964,7 +1963,7 @@ def _new_session_id():
async def _new_session(
address: str,
session_id: str = None,
backend: str = "oscar",
backend: str = "mars",
default: bool = False,
**kwargs,
) -> AbstractSession:
Expand All @@ -1982,7 +1981,7 @@ async def _new_session(
def new_session(
address: str = None,
session_id: str = None,
backend: str = "oscar",
backend: str = "mars",
default: bool = True,
new: bool = True,
**kwargs,
Expand Down
10 changes: 10 additions & 0 deletions mars/deploy/oscar/tests/local_test_with_ray_dag_config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"@inherits": '@default'
session:
custom_log_dir: auto
plasma:
store_memory: 32M
scheduling:
mem_hard_limit: 0
task:
task_executor_config:
"backend": "ray"
Loading