Skip to content

Commit

Permalink
[Ray] Support basic subtask retry and lineage reconstruction (mars-pr…
Browse files Browse the repository at this point in the history
…oject#2969)

(cherry picked from commit 61c0c51)
  • Loading branch information
chaokunyang authored and 继盛 committed May 30, 2022
1 parent 512c284 commit 654f9d5
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 46 deletions.
23 changes: 16 additions & 7 deletions mars/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,8 @@ def _ray_large_cluster(request): # pragma: no cover
param = getattr(request, "param", {})
num_nodes = param.get("num_nodes", 3)
num_cpus = param.get("num_cpus", 16)
try:
from ray.cluster_utils import Cluster
except ModuleNotFoundError:
from ray._private.cluster_utils import Cluster
from ray.cluster_utils import Cluster

cluster = Cluster()
remote_nodes = []
for i in range(num_nodes):
Expand All @@ -130,11 +128,14 @@ def _ray_large_cluster(request): # pragma: no cover
except TypeError:
job_config = None
ray.init(address=cluster.address, job_config=job_config)
register_ray_serializers()
use_ray_serialization = param.get("use_ray_serialization", True)
if use_ray_serialization:
register_ray_serializers()
try:
yield
yield cluster
finally:
unregister_ray_serializers()
if use_ray_serialization:
unregister_ray_serializers()
Router.set_instance(None)
RayServer.clear()
ray.shutdown()
Expand Down Expand Up @@ -178,6 +179,14 @@ async def ray_create_mars_cluster(request, check_router_cleaned):
Router.set_instance(None)


@pytest.fixture
def stop_mars():
yield
import mars

mars.stop_server()


@pytest.fixture(scope="module")
def _new_test_session(check_router_cleaned):
from .deploy.oscar.tests.session import new_test_session
Expand Down
7 changes: 5 additions & 2 deletions mars/dataframe/contrib/raydataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import operator
from functools import reduce

from ....utils import lazy_import
from .mldataset import _rechunk_if_needed
Expand Down Expand Up @@ -59,5 +61,6 @@ def __getstate__():


def get_chunk_refs(df):
fetched_infos: Dict[str, List] = df.fetch_infos(fields=["object_id"])
return fetched_infos["object_id"]
fetched_infos: Dict[str, List] = df.fetch_infos(["object_refs"])
object_refs = reduce(operator.concat, fetched_infos["object_refs"])
return object_refs
8 changes: 5 additions & 3 deletions mars/dataframe/contrib/raydataset/mldataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,12 @@ def to_ray_mldataset(df, num_shards: int = None):
# chunk1 for addr1,
# chunk2 & chunk3 for addr2,
# chunk4 for addr1
fetched_infos: Dict[str, List] = df.fetch_infos(fields=["band", "object_id"])
fetched_infos: Dict[str, List] = df.fetch_infos(fields=["bands", "object_refs"])
chunk_addr_refs: List[Tuple[Tuple, "ray.ObjectRef"]] = [
(band, object_id)
for band, object_id in zip(fetched_infos["band"], fetched_infos["object_id"])
(bands[0], object_refs[0])
for bands, object_refs in zip(
fetched_infos["bands"], fetched_infos["object_refs"]
)
]
group_to_obj_refs: Dict[str, List[ray.ObjectRef]] = _group_chunk_refs(
chunk_addr_refs, num_shards
Expand Down
14 changes: 10 additions & 4 deletions mars/dataframe/contrib/raydataset/tests/test_raydataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ async def test_convert_to_ray_dataset(
with session:
value = np.random.rand(10, 10)
chunk_size, num_shards = test_option
df: md.DataFrame = md.DataFrame(value, chunk_size=chunk_size)
# ray dataset needs str columns
df: md.DataFrame = md.DataFrame(
value,
chunk_size=chunk_size,
columns=[f"c{i}" for i in range(value.shape[1])],
)
df.execute()

ds = mdd.to_ray_dataset(df, num_shards=num_shards)
Expand Down Expand Up @@ -161,7 +166,8 @@ async def test_mars_with_xgboost_sklearn_reg(ray_start_regular_shared, create_cl
session = new_session(address=create_cluster.address, default=True)
with session:
np_X, np_y = make_regression(n_samples=1_0000, n_features=10)
X, y = md.DataFrame(np_X), md.DataFrame({"target": np_y})
columns = [f"c{i}" for i in range(np_X.shape[1])]
X, y = md.DataFrame(np_X, columns=columns), md.DataFrame({"target": np_y})
df: md.DataFrame = md.concat([md.DataFrame(X), md.DataFrame(y)], axis=1)
df.execute()

Expand All @@ -171,10 +177,10 @@ async def test_mars_with_xgboost_sklearn_reg(ray_start_regular_shared, create_cl

import gc

gc.collect() # Ensure MLDataset does hold mars dataframe to avoid gc.
gc.collect() # Ensure Dataset does hold mars dataframe to avoid gc.
ray_params = RayParams(num_actors=2, cpus_per_actor=1)
reg = RayXGBRegressor(ray_params=ray_params, random_state=42)
# train
reg.fit(RayDMatrix(ds, "target"), y=None, ray_params=ray_params)
reg.predict(RayDMatrix(ds, "target"))
reg.predict(pd.DataFrame(np_X))
reg.predict(pd.DataFrame(np_X, columns=columns))
84 changes: 57 additions & 27 deletions mars/deploy/oscar/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,14 @@ async def fetch(self, *tileables, **kwargs) -> list:
return result

async def fetch_infos(self, *tileables, fields, **kwargs) -> list:
available_fields = {"object_id", "level", "memory_size", "store_size", "band"}
available_fields = {
"object_id",
"object_refs",
"level",
"memory_size",
"store_size",
"bands",
}
if fields is None:
fields = available_fields
else:
Expand All @@ -1175,34 +1182,22 @@ async def fetch_infos(self, *tileables, fields, **kwargs) -> list:
if kwargs: # pragma: no cover
unexpected_keys = ", ".join(list(kwargs.keys()))
raise TypeError(f"`fetch` got unexpected arguments: {unexpected_keys}")

# following fields needs to access storage API to get the meta.
_need_query_storage_fields = {"level", "memory_size", "store_size"}
_need_query_storage = bool(_need_query_storage_fields & fields)
with enter_mode(build=True):
chunks = []
get_chunk_metas = []
fetch_infos_list = []
for tileable in tileables:
fetch_tileable, _ = self._get_to_fetch_tileable(tileable)
fetch_infos = []
for chunk in fetch_tileable.chunks:
chunks.append(chunk)
get_chunk_metas.append(
self._meta_api.get_chunk_meta.delay(chunk.key, fields=["bands"])
)
fetch_infos.append(
ChunkFetchInfo(tileable=tileable, chunk=chunk, indexes=None)
)
fetch_infos_list.append(fetch_infos)
chunk_metas = await self._meta_api.get_chunk_meta.batch(*get_chunk_metas)
chunk_to_band = {
chunk: meta["bands"][0] for chunk, meta in zip(chunks, chunk_metas)
}

chunk_to_bands, fetch_infos_list, result = await self._query_meta_service(
tileables, fields, _need_query_storage
)
if not _need_query_storage:
assert result is not None
return result
storage_api_to_gets = defaultdict(list)
storage_api_to_fetch_infos = defaultdict(list)
for fetch_info in itertools.chain(*fetch_infos_list):
chunk = fetch_info.chunk
band = chunk_to_band[chunk]
storage_api = await self._get_storage_api(band)
bands = chunk_to_bands[chunk]
storage_api = await self._get_storage_api(bands[0])
storage_api_to_gets[storage_api].append(
storage_api.get_infos.delay(chunk.key)
)
Expand All @@ -1219,7 +1214,7 @@ async def fetch_infos(self, *tileables, fields, **kwargs) -> list:
for fetch_infos in fetch_infos_list:
fetched = defaultdict(list)
for fetch_info in fetch_infos:
band = chunk_to_band[fetch_info.chunk]
bands = chunk_to_bands[fetch_info.chunk]
# Currently there's only one item in the returned List from storage_api.get_infos()
data = fetch_info.data[0]
if "object_id" in fields:
Expand All @@ -1232,12 +1227,47 @@ async def fetch_infos(self, *tileables, fields, **kwargs) -> list:
fetched["store_size"].append(data.store_size)
# data.band misses ip info, e.g. 'numa-0'
# while band doesn't, e.g. (address0, 'numa-0')
if "band" in fields:
fetched["band"].append(band)
if "bands" in fields:
fetched["bands"].append(bands)
result.append(fetched)

return result

async def _query_meta_service(self, tileables, fields, query_storage):
chunks = []
get_chunk_metas = []
fetch_infos_list = []
for tileable in tileables:
fetch_tileable, _ = self._get_to_fetch_tileable(tileable)
fetch_infos = []
for chunk in fetch_tileable.chunks:
chunks.append(chunk)
get_chunk_metas.append(
self._meta_api.get_chunk_meta.delay(
chunk.key,
fields=["bands"] if query_storage else fields,
)
)
fetch_infos.append(
ChunkFetchInfo(tileable=tileable, chunk=chunk, indexes=None)
)
fetch_infos_list.append(fetch_infos)
chunk_metas = await self._meta_api.get_chunk_meta.batch(*get_chunk_metas)
if not query_storage:
result = []
chunk_to_meta = dict(zip(chunks, chunk_metas))
for fetch_infos in fetch_infos_list:
fetched = defaultdict(list)
for fetch_info in fetch_infos:
for field in fields:
fetched[field].append(chunk_to_meta[fetch_info.chunk][field])
result.append(fetched)
return {}, fetch_infos_list, result
chunk_to_bands = {
chunk: meta["bands"] for chunk, meta in zip(chunks, chunk_metas)
}
return chunk_to_bands, fetch_infos_list, None

async def decref(self, *tileable_keys):
logger.debug("Decref tileables on client: %s", tileable_keys)
return await self._lifecycle_api.decref_tileables(list(tileable_keys))
Expand Down
9 changes: 7 additions & 2 deletions mars/deploy/oscar/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,12 @@ async def test_fetch_infos(create_cluster):
assert "level" in fetched_infos
assert "memory_size" in fetched_infos
assert "store_size" in fetched_infos
assert "band" in fetched_infos
assert "bands" in fetched_infos

fetched_infos = df.fetch_infos(fields=["object_id", "bands"])
assert "object_id" in fetched_infos
assert "bands" in fetched_infos
assert len(fetched_infos) == 2

fetch_infos((df, df), fields=None)
results_infos = mr.ExecutableTuple([df, df]).execute()._fetch_infos()
Expand All @@ -389,7 +394,7 @@ async def test_fetch_infos(create_cluster):
assert "level" in results_infos[0]
assert "memory_size" in results_infos[0]
assert "store_size" in results_infos[0]
assert "band" in results_infos[0]
assert "bands" in results_infos[0]


async def _run_web_session_test(web_address):
Expand Down
8 changes: 8 additions & 0 deletions mars/deploy/oscar/tests/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

import asyncio
import copy
import operator
import os
import subprocess
import sys
import tempfile
import threading
import time
from functools import reduce

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -153,6 +155,12 @@ async def test_execute_describe(ray_start_regular, create_cluster):
@pytest.mark.asyncio
async def test_fetch_infos(ray_start_regular, create_cluster):
await test_local.test_fetch_infos(create_cluster)
df = md.DataFrame(mt.random.RandomState(0).rand(5000, 1, chunk_size=1000))
df.execute()
fetched_infos = df.fetch_infos(fields=["object_refs"])
object_refs = reduce(operator.concat, fetched_infos["object_refs"])
assert len(fetched_infos) == 1
assert len(object_refs) == 5


@require_ray
Expand Down
108 changes: 108 additions & 0 deletions mars/deploy/oscar/tests/test_ray_dag_failover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import operator
from functools import reduce

import pandas as pd
import pytest

import mars
from .... import dataframe as md
from .... import tensor as mt
from ....tests.core import require_ray
from ....utils import lazy_import

ray = lazy_import("ray")


@require_ray
@pytest.mark.parametrize(
"ray_large_cluster",
[{"num_nodes": 0, "use_ray_serialization": False}],
indirect=True,
)
@pytest.mark.parametrize("reconstruction_enabled", [True, False])
def test_basic_object_reconstruction(
ray_large_cluster, reconstruction_enabled, stop_mars
):
config = {
"num_heartbeats_timeout": 10,
"raylet_heartbeat_period_milliseconds": 200,
"object_timeout_milliseconds": 200,
}
# Workaround to reset the config to the default value.
if not reconstruction_enabled:
config["lineage_pinning_enabled"] = False
subtask_max_retries = 0
else:
subtask_max_retries = 1

cluster = ray_large_cluster
# Head node with no resources.
cluster.add_node(
num_cpus=0,
_system_config=config,
enable_object_reconstruction=reconstruction_enabled,
)
ray.init(address=cluster.address)
# Node to place the initial object.
node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10**8)
mars.new_session(
backend="ray",
config={"scheduling.subtask_max_retries": subtask_max_retries},
default=True,
)
cluster.wait_for_nodes()

df = md.DataFrame(mt.random.RandomState(0).rand(2_000_000, 1, chunk_size=1_000_000))
df.execute()
# this will submit new ray tasks
df2 = df.map_chunk(lambda pdf: pdf * 2).execute()
executed_infos = df2.fetch_infos(fields=["object_refs"])
object_refs = reduce(operator.concat, executed_infos["object_refs"])
head5 = df2.head(5).to_pandas()

cluster.remove_node(node_to_kill, allow_graceful=False)
node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10**8)

# use a dependent_task to avoid fetch lost objects to local
@ray.remote
def dependent_task(x):
return x

if reconstruction_enabled:
ray.get([dependent_task.remote(ref) for ref in object_refs])
new_head5 = df2.head(5).to_pandas()
pd.testing.assert_frame_equal(head5, new_head5)
else:
with pytest.raises(ray.exceptions.RayTaskError):
df2.head(5).to_pandas()
with pytest.raises(ray.exceptions.ObjectLostError):
ray.get(object_refs)

# Losing the object a second time will cause reconstruction to fail because
# we have reached the max task retries.
cluster.remove_node(node_to_kill, allow_graceful=False)
cluster.add_node(num_cpus=1, object_store_memory=10**8)

if reconstruction_enabled:
with pytest.raises(
ray.exceptions.ObjectReconstructionFailedMaxAttemptsExceededError
):
ray.get(object_refs)
else:
with pytest.raises(ray.exceptions.ObjectLostError):
ray.get(object_refs)
Loading

0 comments on commit 654f9d5

Please sign in to comment.