diff --git a/mars/conftest.py b/mars/conftest.py index 0023586fdc..a30c7dc107 100644 --- a/mars/conftest.py +++ b/mars/conftest.py @@ -112,10 +112,8 @@ def _ray_large_cluster(request): # pragma: no cover param = getattr(request, "param", {}) num_nodes = param.get("num_nodes", 3) num_cpus = param.get("num_cpus", 16) - try: - from ray.cluster_utils import Cluster - except ModuleNotFoundError: - from ray._private.cluster_utils import Cluster + from ray.cluster_utils import Cluster + cluster = Cluster() remote_nodes = [] for i in range(num_nodes): @@ -130,11 +128,14 @@ def _ray_large_cluster(request): # pragma: no cover except TypeError: job_config = None ray.init(address=cluster.address, job_config=job_config) - register_ray_serializers() + use_ray_serialization = param.get("use_ray_serialization", True) + if use_ray_serialization: + register_ray_serializers() try: - yield + yield cluster finally: - unregister_ray_serializers() + if use_ray_serialization: + unregister_ray_serializers() Router.set_instance(None) RayServer.clear() ray.shutdown() @@ -178,6 +179,14 @@ async def ray_create_mars_cluster(request, check_router_cleaned): Router.set_instance(None) +@pytest.fixture +def stop_mars(): + yield + import mars + + mars.stop_server() + + @pytest.fixture(scope="module") def _new_test_session(check_router_cleaned): from .deploy.oscar.tests.session import new_test_session diff --git a/mars/dataframe/contrib/raydataset/dataset.py b/mars/dataframe/contrib/raydataset/dataset.py index 6ec937d5b2..21581c59b8 100644 --- a/mars/dataframe/contrib/raydataset/dataset.py +++ b/mars/dataframe/contrib/raydataset/dataset.py @@ -11,6 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import operator +from functools import reduce from ....utils import lazy_import from .mldataset import _rechunk_if_needed @@ -59,5 +61,6 @@ def __getstate__(): def get_chunk_refs(df): - fetched_infos: Dict[str, List] = df.fetch_infos(fields=["object_id"]) - return fetched_infos["object_id"] + fetched_infos: Dict[str, List] = df.fetch_infos(["object_refs"]) + object_refs = reduce(operator.concat, fetched_infos["object_refs"]) + return object_refs diff --git a/mars/dataframe/contrib/raydataset/mldataset.py b/mars/dataframe/contrib/raydataset/mldataset.py index f239d766f9..badba7446d 100644 --- a/mars/dataframe/contrib/raydataset/mldataset.py +++ b/mars/dataframe/contrib/raydataset/mldataset.py @@ -104,10 +104,12 @@ def to_ray_mldataset(df, num_shards: int = None): # chunk1 for addr1, # chunk2 & chunk3 for addr2, # chunk4 for addr1 - fetched_infos: Dict[str, List] = df.fetch_infos(fields=["band", "object_id"]) + fetched_infos: Dict[str, List] = df.fetch_infos(fields=["bands", "object_refs"]) chunk_addr_refs: List[Tuple[Tuple, "ray.ObjectRef"]] = [ - (band, object_id) - for band, object_id in zip(fetched_infos["band"], fetched_infos["object_id"]) + (bands[0], object_refs[0]) + for bands, object_refs in zip( + fetched_infos["bands"], fetched_infos["object_refs"] + ) ] group_to_obj_refs: Dict[str, List[ray.ObjectRef]] = _group_chunk_refs( chunk_addr_refs, num_shards diff --git a/mars/dataframe/contrib/raydataset/tests/test_raydataset.py b/mars/dataframe/contrib/raydataset/tests/test_raydataset.py index f4113cd6de..a66feacc16 100644 --- a/mars/dataframe/contrib/raydataset/tests/test_raydataset.py +++ b/mars/dataframe/contrib/raydataset/tests/test_raydataset.py @@ -59,7 +59,12 @@ async def test_convert_to_ray_dataset( with session: value = np.random.rand(10, 10) chunk_size, num_shards = test_option - df: md.DataFrame = md.DataFrame(value, chunk_size=chunk_size) + # ray dataset needs str columns + df: md.DataFrame = md.DataFrame( + value, + chunk_size=chunk_size, + columns=[f"c{i}" for i in range(value.shape[1])], + ) df.execute() ds = mdd.to_ray_dataset(df, num_shards=num_shards) @@ -161,7 +166,8 @@ async def test_mars_with_xgboost_sklearn_reg(ray_start_regular_shared, create_cl session = new_session(address=create_cluster.address, default=True) with session: np_X, np_y = make_regression(n_samples=1_0000, n_features=10) - X, y = md.DataFrame(np_X), md.DataFrame({"target": np_y}) + columns = [f"c{i}" for i in range(np_X.shape[1])] + X, y = md.DataFrame(np_X, columns=columns), md.DataFrame({"target": np_y}) df: md.DataFrame = md.concat([md.DataFrame(X), md.DataFrame(y)], axis=1) df.execute() @@ -171,10 +177,10 @@ async def test_mars_with_xgboost_sklearn_reg(ray_start_regular_shared, create_cl import gc - gc.collect() # Ensure MLDataset does hold mars dataframe to avoid gc. + gc.collect() # Ensure Dataset does hold mars dataframe to avoid gc. ray_params = RayParams(num_actors=2, cpus_per_actor=1) reg = RayXGBRegressor(ray_params=ray_params, random_state=42) # train reg.fit(RayDMatrix(ds, "target"), y=None, ray_params=ray_params) reg.predict(RayDMatrix(ds, "target")) - reg.predict(pd.DataFrame(np_X)) + reg.predict(pd.DataFrame(np_X, columns=columns)) diff --git a/mars/deploy/oscar/session.py b/mars/deploy/oscar/session.py index 412afa55cb..cf08c962d7 100644 --- a/mars/deploy/oscar/session.py +++ b/mars/deploy/oscar/session.py @@ -1161,7 +1161,14 @@ async def fetch(self, *tileables, **kwargs) -> list: return result async def fetch_infos(self, *tileables, fields, **kwargs) -> list: - available_fields = {"object_id", "level", "memory_size", "store_size", "band"} + available_fields = { + "object_id", + "object_refs", + "level", + "memory_size", + "store_size", + "bands", + } if fields is None: fields = available_fields else: @@ -1175,34 +1182,22 @@ async def fetch_infos(self, *tileables, fields, **kwargs) -> list: if kwargs: # pragma: no cover unexpected_keys = ", ".join(list(kwargs.keys())) raise TypeError(f"`fetch` got unexpected arguments: {unexpected_keys}") - + # following fields needs to access storage API to get the meta. + _need_query_storage_fields = {"level", "memory_size", "store_size"} + _need_query_storage = bool(_need_query_storage_fields & fields) with enter_mode(build=True): - chunks = [] - get_chunk_metas = [] - fetch_infos_list = [] - for tileable in tileables: - fetch_tileable, _ = self._get_to_fetch_tileable(tileable) - fetch_infos = [] - for chunk in fetch_tileable.chunks: - chunks.append(chunk) - get_chunk_metas.append( - self._meta_api.get_chunk_meta.delay(chunk.key, fields=["bands"]) - ) - fetch_infos.append( - ChunkFetchInfo(tileable=tileable, chunk=chunk, indexes=None) - ) - fetch_infos_list.append(fetch_infos) - chunk_metas = await self._meta_api.get_chunk_meta.batch(*get_chunk_metas) - chunk_to_band = { - chunk: meta["bands"][0] for chunk, meta in zip(chunks, chunk_metas) - } - + chunk_to_bands, fetch_infos_list, result = await self._query_meta_service( + tileables, fields, _need_query_storage + ) + if not _need_query_storage: + assert result is not None + return result storage_api_to_gets = defaultdict(list) storage_api_to_fetch_infos = defaultdict(list) for fetch_info in itertools.chain(*fetch_infos_list): chunk = fetch_info.chunk - band = chunk_to_band[chunk] - storage_api = await self._get_storage_api(band) + bands = chunk_to_bands[chunk] + storage_api = await self._get_storage_api(bands[0]) storage_api_to_gets[storage_api].append( storage_api.get_infos.delay(chunk.key) ) @@ -1219,7 +1214,7 @@ async def fetch_infos(self, *tileables, fields, **kwargs) -> list: for fetch_infos in fetch_infos_list: fetched = defaultdict(list) for fetch_info in fetch_infos: - band = chunk_to_band[fetch_info.chunk] + bands = chunk_to_bands[fetch_info.chunk] # Currently there's only one item in the returned List from storage_api.get_infos() data = fetch_info.data[0] if "object_id" in fields: @@ -1232,12 +1227,47 @@ async def fetch_infos(self, *tileables, fields, **kwargs) -> list: fetched["store_size"].append(data.store_size) # data.band misses ip info, e.g. 'numa-0' # while band doesn't, e.g. (address0, 'numa-0') - if "band" in fields: - fetched["band"].append(band) + if "bands" in fields: + fetched["bands"].append(bands) result.append(fetched) return result + async def _query_meta_service(self, tileables, fields, query_storage): + chunks = [] + get_chunk_metas = [] + fetch_infos_list = [] + for tileable in tileables: + fetch_tileable, _ = self._get_to_fetch_tileable(tileable) + fetch_infos = [] + for chunk in fetch_tileable.chunks: + chunks.append(chunk) + get_chunk_metas.append( + self._meta_api.get_chunk_meta.delay( + chunk.key, + fields=["bands"] if query_storage else fields, + ) + ) + fetch_infos.append( + ChunkFetchInfo(tileable=tileable, chunk=chunk, indexes=None) + ) + fetch_infos_list.append(fetch_infos) + chunk_metas = await self._meta_api.get_chunk_meta.batch(*get_chunk_metas) + if not query_storage: + result = [] + chunk_to_meta = dict(zip(chunks, chunk_metas)) + for fetch_infos in fetch_infos_list: + fetched = defaultdict(list) + for fetch_info in fetch_infos: + for field in fields: + fetched[field].append(chunk_to_meta[fetch_info.chunk][field]) + result.append(fetched) + return {}, fetch_infos_list, result + chunk_to_bands = { + chunk: meta["bands"] for chunk, meta in zip(chunks, chunk_metas) + } + return chunk_to_bands, fetch_infos_list, None + async def decref(self, *tileable_keys): logger.debug("Decref tileables on client: %s", tileable_keys) return await self._lifecycle_api.decref_tileables(list(tileable_keys)) diff --git a/mars/deploy/oscar/tests/test_local.py b/mars/deploy/oscar/tests/test_local.py index 986f0b1339..63535b789d 100644 --- a/mars/deploy/oscar/tests/test_local.py +++ b/mars/deploy/oscar/tests/test_local.py @@ -380,7 +380,12 @@ async def test_fetch_infos(create_cluster): assert "level" in fetched_infos assert "memory_size" in fetched_infos assert "store_size" in fetched_infos - assert "band" in fetched_infos + assert "bands" in fetched_infos + + fetched_infos = df.fetch_infos(fields=["object_id", "bands"]) + assert "object_id" in fetched_infos + assert "bands" in fetched_infos + assert len(fetched_infos) == 2 fetch_infos((df, df), fields=None) results_infos = mr.ExecutableTuple([df, df]).execute()._fetch_infos() @@ -389,7 +394,7 @@ async def test_fetch_infos(create_cluster): assert "level" in results_infos[0] assert "memory_size" in results_infos[0] assert "store_size" in results_infos[0] - assert "band" in results_infos[0] + assert "bands" in results_infos[0] async def _run_web_session_test(web_address): diff --git a/mars/deploy/oscar/tests/test_ray.py b/mars/deploy/oscar/tests/test_ray.py index c1056e73fc..d14d0e6568 100644 --- a/mars/deploy/oscar/tests/test_ray.py +++ b/mars/deploy/oscar/tests/test_ray.py @@ -14,12 +14,14 @@ import asyncio import copy +import operator import os import subprocess import sys import tempfile import threading import time +from functools import reduce import numpy as np import pandas as pd @@ -153,6 +155,12 @@ async def test_execute_describe(ray_start_regular, create_cluster): @pytest.mark.asyncio async def test_fetch_infos(ray_start_regular, create_cluster): await test_local.test_fetch_infos(create_cluster) + df = md.DataFrame(mt.random.RandomState(0).rand(5000, 1, chunk_size=1000)) + df.execute() + fetched_infos = df.fetch_infos(fields=["object_refs"]) + object_refs = reduce(operator.concat, fetched_infos["object_refs"]) + assert len(fetched_infos) == 1 + assert len(object_refs) == 5 @require_ray diff --git a/mars/deploy/oscar/tests/test_ray_dag_failover.py b/mars/deploy/oscar/tests/test_ray_dag_failover.py new file mode 100644 index 0000000000..bf337b05d6 --- /dev/null +++ b/mars/deploy/oscar/tests/test_ray_dag_failover.py @@ -0,0 +1,108 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import operator +from functools import reduce + +import pandas as pd +import pytest + +import mars +from .... import dataframe as md +from .... import tensor as mt +from ....tests.core import require_ray +from ....utils import lazy_import + +ray = lazy_import("ray") + + +@require_ray +@pytest.mark.parametrize( + "ray_large_cluster", + [{"num_nodes": 0, "use_ray_serialization": False}], + indirect=True, +) +@pytest.mark.parametrize("reconstruction_enabled", [True, False]) +def test_basic_object_reconstruction( + ray_large_cluster, reconstruction_enabled, stop_mars +): + config = { + "num_heartbeats_timeout": 10, + "raylet_heartbeat_period_milliseconds": 200, + "object_timeout_milliseconds": 200, + } + # Workaround to reset the config to the default value. + if not reconstruction_enabled: + config["lineage_pinning_enabled"] = False + subtask_max_retries = 0 + else: + subtask_max_retries = 1 + + cluster = ray_large_cluster + # Head node with no resources. + cluster.add_node( + num_cpus=0, + _system_config=config, + enable_object_reconstruction=reconstruction_enabled, + ) + ray.init(address=cluster.address) + # Node to place the initial object. + node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10**8) + mars.new_session( + backend="ray", + config={"scheduling.subtask_max_retries": subtask_max_retries}, + default=True, + ) + cluster.wait_for_nodes() + + df = md.DataFrame(mt.random.RandomState(0).rand(2_000_000, 1, chunk_size=1_000_000)) + df.execute() + # this will submit new ray tasks + df2 = df.map_chunk(lambda pdf: pdf * 2).execute() + executed_infos = df2.fetch_infos(fields=["object_refs"]) + object_refs = reduce(operator.concat, executed_infos["object_refs"]) + head5 = df2.head(5).to_pandas() + + cluster.remove_node(node_to_kill, allow_graceful=False) + node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10**8) + + # use a dependent_task to avoid fetch lost objects to local + @ray.remote + def dependent_task(x): + return x + + if reconstruction_enabled: + ray.get([dependent_task.remote(ref) for ref in object_refs]) + new_head5 = df2.head(5).to_pandas() + pd.testing.assert_frame_equal(head5, new_head5) + else: + with pytest.raises(ray.exceptions.RayTaskError): + df2.head(5).to_pandas() + with pytest.raises(ray.exceptions.ObjectLostError): + ray.get(object_refs) + + # Losing the object a second time will cause reconstruction to fail because + # we have reached the max task retries. + cluster.remove_node(node_to_kill, allow_graceful=False) + cluster.add_node(num_cpus=1, object_store_memory=10**8) + + if reconstruction_enabled: + with pytest.raises( + ray.exceptions.ObjectReconstructionFailedMaxAttemptsExceededError + ): + ray.get(object_refs) + else: + with pytest.raises(ray.exceptions.ObjectLostError): + ray.get(object_refs) diff --git a/mars/deploy/utils.py b/mars/deploy/utils.py index 3f7378289b..8ca658cd96 100644 --- a/mars/deploy/utils.py +++ b/mars/deploy/utils.py @@ -163,6 +163,10 @@ def load_config(config: Union[str, Dict], default_config_file: str): "ensure enough homogeneous subtasks to calculate statistics." ) config["task"]["default_config"]["initial_same_color_num"] = 1 + ray_execution_config = config["task"]["execution_config"].setdefault("ray", {}) + subtask_max_retries = config["scheduling"].get("subtask_max_retries") + if subtask_max_retries is not None: + ray_execution_config.setdefault("subtask_max_retries", subtask_max_retries) return config diff --git a/mars/services/task/execution/ray/config.py b/mars/services/task/execution/ray/config.py index f87deeefd7..1af7899995 100644 --- a/mars/services/task/execution/ray/config.py +++ b/mars/services/task/execution/ray/config.py @@ -18,10 +18,20 @@ from ..utils import get_band_resources_from_config +# the default times to retry subtask. +DEFAULT_SUBTASK_MAX_RETRIES = 3 + + @register_config_cls class RayExecutionConfig(ExecutionConfig): name = "ray" + def __init__(self, execution_config: Dict): + super().__init__(execution_config) + self._subtask_max_retries = self._execution_config.get("ray", {}).get( + "subtask_max_retries", DEFAULT_SUBTASK_MAX_RETRIES + ) + def get_band_resources(self): """ Get the band resources from config for generating ray virtual @@ -31,3 +41,7 @@ def get_band_resources(self): def get_deploy_band_resources(self) -> List[Dict[str, Resource]]: return [] + + @property + def subtask_max_retries(self): + return self._subtask_max_retries diff --git a/mars/services/task/execution/ray/executor.py b/mars/services/task/execution/ray/executor.py index 7b52048ea4..201ba8da74 100644 --- a/mars/services/task/execution/ray/executor.py +++ b/mars/services/task/execution/ray/executor.py @@ -283,8 +283,11 @@ async def execute_subtask_graph( output_keys = self._get_subtask_output_keys(subtask_chunk_graph) output_meta_keys = result_meta_keys & output_keys output_count = len(output_keys) + bool(output_meta_keys) + subtask_max_retries = ( + self._config.subtask_max_retries if subtask.retryable else 0 + ) output_object_refs = self._ray_executor.options( - num_returns=output_count + num_returns=output_count, max_retries=subtask_max_retries ).remote( subtask.task_id, subtask.subtask_id,