Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACKPORT] [Ray] Support basic subtask retry and lineage reconstruction (#2969) #3097

Merged
merged 1 commit into from
May 31, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions mars/conftest.py
Original file line number Diff line number Diff line change
@@ -112,10 +112,8 @@ def _ray_large_cluster(request): # pragma: no cover
param = getattr(request, "param", {})
num_nodes = param.get("num_nodes", 3)
num_cpus = param.get("num_cpus", 16)
try:
from ray.cluster_utils import Cluster
except ModuleNotFoundError:
from ray._private.cluster_utils import Cluster
from ray.cluster_utils import Cluster

cluster = Cluster()
remote_nodes = []
for i in range(num_nodes):
@@ -130,11 +128,14 @@ def _ray_large_cluster(request): # pragma: no cover
except TypeError:
job_config = None
ray.init(address=cluster.address, job_config=job_config)
register_ray_serializers()
use_ray_serialization = param.get("use_ray_serialization", True)
if use_ray_serialization:
register_ray_serializers()
try:
yield
yield cluster
finally:
unregister_ray_serializers()
if use_ray_serialization:
unregister_ray_serializers()
Router.set_instance(None)
RayServer.clear()
ray.shutdown()
@@ -178,6 +179,14 @@ async def ray_create_mars_cluster(request, check_router_cleaned):
Router.set_instance(None)


@pytest.fixture
def stop_mars():
yield
import mars

mars.stop_server()


@pytest.fixture(scope="module")
def _new_test_session(check_router_cleaned):
from .deploy.oscar.tests.session import new_test_session
7 changes: 5 additions & 2 deletions mars/dataframe/contrib/raydataset/dataset.py
Original file line number Diff line number Diff line change
@@ -11,6 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import operator
from functools import reduce

from ....utils import lazy_import
from .mldataset import _rechunk_if_needed
@@ -59,5 +61,6 @@ def __getstate__():


def get_chunk_refs(df):
fetched_infos: Dict[str, List] = df.fetch_infos(fields=["object_id"])
return fetched_infos["object_id"]
fetched_infos: Dict[str, List] = df.fetch_infos(["object_refs"])
object_refs = reduce(operator.concat, fetched_infos["object_refs"])
return object_refs
8 changes: 5 additions & 3 deletions mars/dataframe/contrib/raydataset/mldataset.py
Original file line number Diff line number Diff line change
@@ -104,10 +104,12 @@ def to_ray_mldataset(df, num_shards: int = None):
# chunk1 for addr1,
# chunk2 & chunk3 for addr2,
# chunk4 for addr1
fetched_infos: Dict[str, List] = df.fetch_infos(fields=["band", "object_id"])
fetched_infos: Dict[str, List] = df.fetch_infos(fields=["bands", "object_refs"])
chunk_addr_refs: List[Tuple[Tuple, "ray.ObjectRef"]] = [
(band, object_id)
for band, object_id in zip(fetched_infos["band"], fetched_infos["object_id"])
(bands[0], object_refs[0])
for bands, object_refs in zip(
fetched_infos["bands"], fetched_infos["object_refs"]
)
]
group_to_obj_refs: Dict[str, List[ray.ObjectRef]] = _group_chunk_refs(
chunk_addr_refs, num_shards
14 changes: 10 additions & 4 deletions mars/dataframe/contrib/raydataset/tests/test_raydataset.py
Original file line number Diff line number Diff line change
@@ -59,7 +59,12 @@ async def test_convert_to_ray_dataset(
with session:
value = np.random.rand(10, 10)
chunk_size, num_shards = test_option
df: md.DataFrame = md.DataFrame(value, chunk_size=chunk_size)
# ray dataset needs str columns
df: md.DataFrame = md.DataFrame(
value,
chunk_size=chunk_size,
columns=[f"c{i}" for i in range(value.shape[1])],
)
df.execute()

ds = mdd.to_ray_dataset(df, num_shards=num_shards)
@@ -161,7 +166,8 @@ async def test_mars_with_xgboost_sklearn_reg(ray_start_regular_shared, create_cl
session = new_session(address=create_cluster.address, default=True)
with session:
np_X, np_y = make_regression(n_samples=1_0000, n_features=10)
X, y = md.DataFrame(np_X), md.DataFrame({"target": np_y})
columns = [f"c{i}" for i in range(np_X.shape[1])]
X, y = md.DataFrame(np_X, columns=columns), md.DataFrame({"target": np_y})
df: md.DataFrame = md.concat([md.DataFrame(X), md.DataFrame(y)], axis=1)
df.execute()

@@ -171,10 +177,10 @@ async def test_mars_with_xgboost_sklearn_reg(ray_start_regular_shared, create_cl

import gc

gc.collect() # Ensure MLDataset does hold mars dataframe to avoid gc.
gc.collect() # Ensure Dataset does hold mars dataframe to avoid gc.
ray_params = RayParams(num_actors=2, cpus_per_actor=1)
reg = RayXGBRegressor(ray_params=ray_params, random_state=42)
# train
reg.fit(RayDMatrix(ds, "target"), y=None, ray_params=ray_params)
reg.predict(RayDMatrix(ds, "target"))
reg.predict(pd.DataFrame(np_X))
reg.predict(pd.DataFrame(np_X, columns=columns))
84 changes: 57 additions & 27 deletions mars/deploy/oscar/session.py
Original file line number Diff line number Diff line change
@@ -1161,7 +1161,14 @@ async def fetch(self, *tileables, **kwargs) -> list:
return result

async def fetch_infos(self, *tileables, fields, **kwargs) -> list:
available_fields = {"object_id", "level", "memory_size", "store_size", "band"}
available_fields = {
"object_id",
"object_refs",
"level",
"memory_size",
"store_size",
"bands",
}
if fields is None:
fields = available_fields
else:
@@ -1175,34 +1182,22 @@ async def fetch_infos(self, *tileables, fields, **kwargs) -> list:
if kwargs: # pragma: no cover
unexpected_keys = ", ".join(list(kwargs.keys()))
raise TypeError(f"`fetch` got unexpected arguments: {unexpected_keys}")

# following fields needs to access storage API to get the meta.
_need_query_storage_fields = {"level", "memory_size", "store_size"}
_need_query_storage = bool(_need_query_storage_fields & fields)
with enter_mode(build=True):
chunks = []
get_chunk_metas = []
fetch_infos_list = []
for tileable in tileables:
fetch_tileable, _ = self._get_to_fetch_tileable(tileable)
fetch_infos = []
for chunk in fetch_tileable.chunks:
chunks.append(chunk)
get_chunk_metas.append(
self._meta_api.get_chunk_meta.delay(chunk.key, fields=["bands"])
)
fetch_infos.append(
ChunkFetchInfo(tileable=tileable, chunk=chunk, indexes=None)
)
fetch_infos_list.append(fetch_infos)
chunk_metas = await self._meta_api.get_chunk_meta.batch(*get_chunk_metas)
chunk_to_band = {
chunk: meta["bands"][0] for chunk, meta in zip(chunks, chunk_metas)
}

chunk_to_bands, fetch_infos_list, result = await self._query_meta_service(
tileables, fields, _need_query_storage
)
if not _need_query_storage:
assert result is not None
return result
storage_api_to_gets = defaultdict(list)
storage_api_to_fetch_infos = defaultdict(list)
for fetch_info in itertools.chain(*fetch_infos_list):
chunk = fetch_info.chunk
band = chunk_to_band[chunk]
storage_api = await self._get_storage_api(band)
bands = chunk_to_bands[chunk]
storage_api = await self._get_storage_api(bands[0])
storage_api_to_gets[storage_api].append(
storage_api.get_infos.delay(chunk.key)
)
@@ -1219,7 +1214,7 @@ async def fetch_infos(self, *tileables, fields, **kwargs) -> list:
for fetch_infos in fetch_infos_list:
fetched = defaultdict(list)
for fetch_info in fetch_infos:
band = chunk_to_band[fetch_info.chunk]
bands = chunk_to_bands[fetch_info.chunk]
# Currently there's only one item in the returned List from storage_api.get_infos()
data = fetch_info.data[0]
if "object_id" in fields:
@@ -1232,12 +1227,47 @@ async def fetch_infos(self, *tileables, fields, **kwargs) -> list:
fetched["store_size"].append(data.store_size)
# data.band misses ip info, e.g. 'numa-0'
# while band doesn't, e.g. (address0, 'numa-0')
if "band" in fields:
fetched["band"].append(band)
if "bands" in fields:
fetched["bands"].append(bands)
result.append(fetched)

return result

async def _query_meta_service(self, tileables, fields, query_storage):
chunks = []
get_chunk_metas = []
fetch_infos_list = []
for tileable in tileables:
fetch_tileable, _ = self._get_to_fetch_tileable(tileable)
fetch_infos = []
for chunk in fetch_tileable.chunks:
chunks.append(chunk)
get_chunk_metas.append(
self._meta_api.get_chunk_meta.delay(
chunk.key,
fields=["bands"] if query_storage else fields,
)
)
fetch_infos.append(
ChunkFetchInfo(tileable=tileable, chunk=chunk, indexes=None)
)
fetch_infos_list.append(fetch_infos)
chunk_metas = await self._meta_api.get_chunk_meta.batch(*get_chunk_metas)
if not query_storage:
result = []
chunk_to_meta = dict(zip(chunks, chunk_metas))
for fetch_infos in fetch_infos_list:
fetched = defaultdict(list)
for fetch_info in fetch_infos:
for field in fields:
fetched[field].append(chunk_to_meta[fetch_info.chunk][field])
result.append(fetched)
return {}, fetch_infos_list, result
chunk_to_bands = {
chunk: meta["bands"] for chunk, meta in zip(chunks, chunk_metas)
}
return chunk_to_bands, fetch_infos_list, None

async def decref(self, *tileable_keys):
logger.debug("Decref tileables on client: %s", tileable_keys)
return await self._lifecycle_api.decref_tileables(list(tileable_keys))
9 changes: 7 additions & 2 deletions mars/deploy/oscar/tests/test_local.py
Original file line number Diff line number Diff line change
@@ -380,7 +380,12 @@ async def test_fetch_infos(create_cluster):
assert "level" in fetched_infos
assert "memory_size" in fetched_infos
assert "store_size" in fetched_infos
assert "band" in fetched_infos
assert "bands" in fetched_infos

fetched_infos = df.fetch_infos(fields=["object_id", "bands"])
assert "object_id" in fetched_infos
assert "bands" in fetched_infos
assert len(fetched_infos) == 2

fetch_infos((df, df), fields=None)
results_infos = mr.ExecutableTuple([df, df]).execute()._fetch_infos()
@@ -389,7 +394,7 @@ async def test_fetch_infos(create_cluster):
assert "level" in results_infos[0]
assert "memory_size" in results_infos[0]
assert "store_size" in results_infos[0]
assert "band" in results_infos[0]
assert "bands" in results_infos[0]


async def _run_web_session_test(web_address):
8 changes: 8 additions & 0 deletions mars/deploy/oscar/tests/test_ray.py
Original file line number Diff line number Diff line change
@@ -14,12 +14,14 @@

import asyncio
import copy
import operator
import os
import subprocess
import sys
import tempfile
import threading
import time
from functools import reduce

import numpy as np
import pandas as pd
@@ -153,6 +155,12 @@ async def test_execute_describe(ray_start_regular, create_cluster):
@pytest.mark.asyncio
async def test_fetch_infos(ray_start_regular, create_cluster):
await test_local.test_fetch_infos(create_cluster)
df = md.DataFrame(mt.random.RandomState(0).rand(5000, 1, chunk_size=1000))
df.execute()
fetched_infos = df.fetch_infos(fields=["object_refs"])
object_refs = reduce(operator.concat, fetched_infos["object_refs"])
assert len(fetched_infos) == 1
assert len(object_refs) == 5


@require_ray
108 changes: 108 additions & 0 deletions mars/deploy/oscar/tests/test_ray_dag_failover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import operator
from functools import reduce

import pandas as pd
import pytest

import mars
from .... import dataframe as md
from .... import tensor as mt
from ....tests.core import require_ray
from ....utils import lazy_import

ray = lazy_import("ray")


@require_ray
@pytest.mark.parametrize(
"ray_large_cluster",
[{"num_nodes": 0, "use_ray_serialization": False}],
indirect=True,
)
@pytest.mark.parametrize("reconstruction_enabled", [True, False])
def test_basic_object_reconstruction(
ray_large_cluster, reconstruction_enabled, stop_mars
):
config = {
"num_heartbeats_timeout": 10,
"raylet_heartbeat_period_milliseconds": 200,
"object_timeout_milliseconds": 200,
}
# Workaround to reset the config to the default value.
if not reconstruction_enabled:
config["lineage_pinning_enabled"] = False
subtask_max_retries = 0
else:
subtask_max_retries = 1

cluster = ray_large_cluster
# Head node with no resources.
cluster.add_node(
num_cpus=0,
_system_config=config,
enable_object_reconstruction=reconstruction_enabled,
)
ray.init(address=cluster.address)
# Node to place the initial object.
node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10**8)
mars.new_session(
backend="ray",
config={"scheduling.subtask_max_retries": subtask_max_retries},
default=True,
)
cluster.wait_for_nodes()

df = md.DataFrame(mt.random.RandomState(0).rand(2_000_000, 1, chunk_size=1_000_000))
df.execute()
# this will submit new ray tasks
df2 = df.map_chunk(lambda pdf: pdf * 2).execute()
executed_infos = df2.fetch_infos(fields=["object_refs"])
object_refs = reduce(operator.concat, executed_infos["object_refs"])
head5 = df2.head(5).to_pandas()

cluster.remove_node(node_to_kill, allow_graceful=False)
node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10**8)

# use a dependent_task to avoid fetch lost objects to local
@ray.remote
def dependent_task(x):
return x

if reconstruction_enabled:
ray.get([dependent_task.remote(ref) for ref in object_refs])
new_head5 = df2.head(5).to_pandas()
pd.testing.assert_frame_equal(head5, new_head5)
else:
with pytest.raises(ray.exceptions.RayTaskError):
df2.head(5).to_pandas()
with pytest.raises(ray.exceptions.ObjectLostError):
ray.get(object_refs)

# Losing the object a second time will cause reconstruction to fail because
# we have reached the max task retries.
cluster.remove_node(node_to_kill, allow_graceful=False)
cluster.add_node(num_cpus=1, object_store_memory=10**8)

if reconstruction_enabled:
with pytest.raises(
ray.exceptions.ObjectReconstructionFailedMaxAttemptsExceededError
):
ray.get(object_refs)
else:
with pytest.raises(ray.exceptions.ObjectLostError):
ray.get(object_refs)
Loading