Skip to content

Commit

Permalink
[Ray] Implements get_chunks_result for Ray execution context (#3023)
Browse files Browse the repository at this point in the history
  • Loading branch information
fyrestone authored May 13, 2022
1 parent ba8a6d9 commit 6c9c4b0
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 36 deletions.
15 changes: 12 additions & 3 deletions .github/workflows/platform-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest]
python-version: [3.8-kubernetes, 3.8-hadoop, 3.8-ray, 3.8-vineyard, 3.8-dask]
python-version: [3.8-kubernetes, 3.8-hadoop, 3.8-ray, 3.8-ray-dag, 3.8-vineyard, 3.8-dask]
include:
- { os: ubuntu-latest, python-version: 3.8-kubernetes, no-common-tests: 1,
no-deploy: 1, with-kubernetes: "with Kubernetes" }
Expand All @@ -28,6 +28,8 @@ jobs:
no-deploy: 1, with-vineyard: "with vineyard" }
- { os: ubuntu-latest, python-version: 3.8-ray, no-common-tests: 1,
no-deploy: 1, with-ray: "with ray" }
- { os: ubuntu-latest, python-version: 3.8-ray-dag, no-common-tests: 1,
no-deploy: 1, with-ray-dag: "with ray dag" }
- { os: ubuntu-latest, python-version: 3.8-dask, no-common-tests: 1,
no-deploy: 1, run-dask: "run dask" }

Expand All @@ -51,6 +53,7 @@ jobs:
WITH_KUBERNETES: ${{ matrix.with-kubernetes }}
WITH_VINEYARD: ${{ matrix.with-vineyard }}
WITH_RAY: ${{ matrix.with-ray }}
WITH_RAY_DAG: ${{ matrix.with-ray-dag }}
RUN_DASK: ${{ matrix.run-dask }}
NO_COMMON_TESTS: ${{ matrix.no-common-tests }}
shell: bash
Expand All @@ -67,7 +70,7 @@ jobs:
if [[ $UNAME == "windows" ]]; then
pip install virtualenv flaky
else
pip install virtualenv flaky ray
pip install virtualenv flaky
if [ -n "$WITH_KUBERNETES" ]; then
./.github/workflows/install-minikube.sh
pip install kubernetes
Expand All @@ -90,7 +93,7 @@ jobs:
sudo mv /tmp/etcd-download-test/etcdctl /usr/local/bin/
rm -fr /tmp/etcd-$ETCD_VER-linux-amd64.tar.gz /tmp/etcd-download-test
fi
if [ -n "$WITH_RAY" ]; then
if [ -n "$WITH_RAY" ] || [ -n "$WITH_RAY_DAG" ]; then
pip install ray[default]==1.9.2
pip install "xgboost_ray==0.1.5" "xgboost<1.6.0"
fi
Expand All @@ -107,6 +110,7 @@ jobs:
WITH_CYTHON: ${{ matrix.with-cython }}
WITH_VINEYARD: ${{ matrix.with-vineyard }}
WITH_RAY: ${{ matrix.with-ray }}
WITH_RAY_DAG: ${{ matrix.with-ray-dag }}
RUN_DASK: ${{ matrix.run-dask }}
NO_COMMON_TESTS: ${{ matrix.no-common-tests }}
NUMPY_EXPERIMENTAL_ARRAY_FUNCTION: 1
Expand Down Expand Up @@ -143,6 +147,11 @@ jobs:
pytest $PYTEST_CONFIG --durations=0 --timeout=600 -v -s -m ray
coverage report
fi
if [ -n "$WITH_RAY_DAG" ]; then
export MARS_CI_BACKEND=ray
pytest $PYTEST_CONFIG --durations=0 --timeout=600 -v -s -m ray_dag
coverage report
fi
if [ -n "$RUN_DASK" ]; then
pytest $PYTEST_CONFIG mars/contrib/dask/tests/test_dask.py
coverage report
Expand Down
15 changes: 13 additions & 2 deletions mars/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from mars.utils import lazy_import

ray = lazy_import("ray")
MARS_CI_BACKEND = os.environ.get("MARS_CI_BACKEND", "mars")


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -167,7 +168,11 @@ def _new_test_session(_stop_isolation):
from .deploy.oscar.tests.session import new_test_session

sess = new_test_session(
address="test://127.0.0.1", init_local=True, default=True, timeout=300
address="test://127.0.0.1",
backend=MARS_CI_BACKEND,
init_local=True,
default=True,
timeout=300,
)
with option_context({"show_progress": False}):
try:
Expand All @@ -181,7 +186,12 @@ def _new_integrated_test_session(_stop_isolation):
from .deploy.oscar.tests.session import new_test_session

sess = new_test_session(
address="127.0.0.1", init_local=True, n_worker=2, default=True, timeout=300
address="127.0.0.1",
backend=MARS_CI_BACKEND,
init_local=True,
n_worker=2,
default=True,
timeout=300,
)
with option_context({"show_progress": False}):
try:
Expand Down Expand Up @@ -213,6 +223,7 @@ def _new_gpu_test_session(_stop_isolation): # pragma: no cover

sess = new_test_session(
address="127.0.0.1",
backend=MARS_CI_BACKEND,
init_local=True,
n_worker=1,
n_cpu=1,
Expand Down
1 change: 1 addition & 0 deletions mars/dataframe/base/tests/test_base_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,7 @@ def test_isin_execution(setup):
pd.testing.assert_frame_equal(result, expected)


@pytest.mark.ray_dag
def test_cut_execution(setup):
session = setup

Expand Down
25 changes: 22 additions & 3 deletions mars/services/task/execution/ray/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

import functools
import inspect
from typing import Union
import logging
from typing import Union, Dict, List

from .....core.context import Context
from .....utils import implements, lazy_import
from ....context import ThreadedServiceContext

ray = lazy_import("ray")
logger = logging.getLogger(__name__)


class RayRemoteObjectManager:
Expand Down Expand Up @@ -89,7 +91,14 @@ def _get_task_state_actor(self) -> "ray.actor.ActorHandle":
@implements(Context.create_remote_object)
def create_remote_object(self, name: str, object_cls, *args, **kwargs):
task_state_actor = self._get_task_state_actor()
task_state_actor.create_remote_object.remote(name, object_cls, *args, **kwargs)
r = task_state_actor.create_remote_object.remote(
name, object_cls, *args, **kwargs
)
# Make sure the actor is created. The remote object may not be created
# when get_remote_object from worker because the callers of
# create_remote_object and get_remote_object are not in the same worker.
# Use sync Ray actor requires this `ray.get`, too.
ray.get(r)
return _RayRemoteObjectWrapper(task_state_actor, name)

@implements(Context.get_remote_object)
Expand All @@ -107,7 +116,17 @@ def destroy_remote_object(self, name: str):
class RayExecutionContext(_RayRemoteObjectContext, ThreadedServiceContext):
"""The context for tiling."""

pass
def __init__(self, task_context: Dict, *args, **kwargs):
super().__init__(*args, **kwargs)
self._task_context = task_context

@implements(Context.get_chunks_result)
def get_chunks_result(self, data_keys: List[str]) -> List:
logger.info("Getting %s chunks result.", len(data_keys))
object_refs = [self._task_context[key] for key in data_keys]
result = ray.get(object_refs)
logger.info("Got %s chunks result.", len(result))
return result


# TODO(fyrestone): Implement more APIs for Ray.
Expand Down
77 changes: 51 additions & 26 deletions mars/services/task/execution/ray/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import asyncio
import functools
import logging
from typing import List, Dict, Any, Set
from .....core import ChunkGraph, Chunk, TileContext
Expand Down Expand Up @@ -123,22 +124,22 @@ def __init__(
config: ExecutionConfig,
task: Task,
tile_context: TileContext,
ray_executor: "ray.remote_function.RemoteFunction",
task_context: Dict[str, "ray.ObjectRef"],
task_state_actor: "ray.actor.ActorHandle",
lifecycle_api: LifecycleAPI,
meta_api: MetaAPI,
):
self._config = config
self._task = task
self._tile_context = tile_context
self._ray_executor = ray_executor
self._task_context = task_context
self._task_state_actor = task_state_actor
self._ray_executor = self._get_ray_executor()

# api
self._lifecycle_api = lifecycle_api
self._meta_api = meta_api

self._task_context = {}
self._available_band_resources = None

# For progress
Expand All @@ -158,19 +159,19 @@ async def create(
tile_context: TileContext,
**kwargs,
) -> "TaskExecutor":
ray_executor = ray.remote(execute_subtask)
lifecycle_api, meta_api = await cls._get_apis(session_id, address)
task_state_actor = (
ray.remote(RayTaskState)
.options(name=RayTaskState.gen_name(task.task_id))
.remote()
)
await cls._init_context(task_state_actor, session_id, address)
task_context = {}
await cls._init_context(task_context, task_state_actor, session_id, address)
return cls(
config,
task,
tile_context,
ray_executor,
task_context,
task_state_actor,
lifecycle_api,
meta_api,
Expand All @@ -184,13 +185,29 @@ async def _get_apis(cls, session_id: str, address: str):
MetaAPI.create(session_id, address),
)

@staticmethod
@functools.lru_cache(maxsize=1)
def _get_ray_executor():
# Export remote function once.
return ray.remote(execute_subtask)

@classmethod
async def _init_context(
cls, task_state_actor: "ray.actor.ActorHandle", session_id: str, address: str
cls,
task_context: Dict[str, "ray.ObjectRef"],
task_state_actor: "ray.actor.ActorHandle",
session_id: str,
address: str,
):
loop = asyncio.get_running_loop()
context = RayExecutionContext(
task_state_actor, session_id, address, address, address, loop=loop
task_context,
task_state_actor,
session_id,
address,
address,
address,
loop=loop,
)
await context.init()
set_context(context)
Expand All @@ -204,7 +221,7 @@ async def execute_subtask_graph(
context: Any = None,
) -> Dict[Chunk, ExecutionChunkResult]:
logger.info("Stage %s start.", stage_id)
context = self._task_context
task_context = self._task_context
output_meta_object_refs = []
self._pre_all_stages_tile_progress = (
self._pre_all_stages_tile_progress + self._cur_stage_tile_progress
Expand All @@ -221,7 +238,7 @@ async def execute_subtask_graph(
for subtask in subtask_graph.topological_iter():
subtask_chunk_graph = subtask.chunk_graph
key_to_input = await self._load_subtask_inputs(
stage_id, subtask, subtask_chunk_graph, context
stage_id, subtask, subtask_chunk_graph, task_context
)
output_keys = self._get_subtask_output_keys(subtask_chunk_graph)
output_meta_keys = result_meta_keys & output_keys
Expand All @@ -245,32 +262,34 @@ async def execute_subtask_graph(
meta_object_ref, *output_object_refs = output_object_refs
# TODO(fyrestone): Fetch(not get) meta object here.
output_meta_object_refs.append(meta_object_ref)
context.update(zip(output_keys, output_object_refs))
task_context.update(zip(output_keys, output_object_refs))
logger.info("Submitted %s subtasks of stage %s.", len(subtask_graph), stage_id)

key_to_meta = {}
if len(output_meta_object_refs) > 0:
# TODO(fyrestone): Optimize update meta by fetching partial meta.
meta_count = len(output_meta_object_refs)
logger.info("Getting %s metas of stage %s.", meta_count, stage_id)
meta_list = await asyncio.gather(*output_meta_object_refs)
for meta in meta_list:
key_to_meta.update(meta)
assert len(key_to_meta) == len(result_meta_keys)
logger.info(
"Got %s metas of stage %s.", len(output_meta_object_refs), stage_id
)
logger.info("Got %s metas of stage %s.", meta_count, stage_id)

chunk_to_meta = {}
output_object_refs = []
# ray.wait requires the object ref list is unique.
output_object_refs = set()
for chunk in chunk_graph.result_chunks:
chunk_key = chunk.key
object_ref = context[chunk_key]
output_object_refs.append(object_ref)
object_ref = task_context[chunk_key]
output_object_refs.add(object_ref)
chunk_meta = key_to_meta.get(chunk_key)
if chunk_meta is not None:
chunk_to_meta[chunk] = ExecutionChunkResult(chunk_meta, object_ref)

logger.info("Waiting for stage %s complete.", stage_id)
ray.wait(output_object_refs, fetch_local=False)
# Patched the asyncio.to_thread for Python < 3.9 at mars/lib/aio/__init__.py
await asyncio.to_thread(ray.wait, list(output_object_refs), fetch_local=False)
# Just use `self._cur_stage_tile_progress` as current stage progress
# because current stage is finished, its progress is 1.
self._pre_all_stages_progress += self._cur_stage_tile_progress
Expand All @@ -289,14 +308,20 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
chunk_keys = []
for chunk in self._tile_context[tileable].chunks:
chunk_keys.append(chunk.key)
object_ref = self._task_context[chunk.key]
update_metas.append(
self._meta_api.set_chunk_meta.delay(
chunk,
bands=[],
object_ref=object_ref,
if chunk.key in self._task_context:
# Some tileable graph may have result chunks that not be executed,
# for example:
# r, b = cut(series, bins, retbins=True)
# r_result = r.execute().fetch()
# b_result = b.execute().fetch() <- This is the case
object_ref = self._task_context[chunk.key]
update_metas.append(
self._meta_api.set_chunk_meta.delay(
chunk,
bands=[],
object_ref=object_ref,
)
)
)
update_lifecycles.append(
self._lifecycle_api.track.delay(tileable.key, chunk_keys)
)
Expand Down Expand Up @@ -325,7 +350,7 @@ async def get_progress(self) -> float:
finished_objects, _ = ray.wait(
self._cur_stage_output_object_refs,
num_returns=total,
timeout=0.1,
timeout=0, # Avoid blocking the asyncio loop.
fetch_local=False,
)
stage_progress = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@

from ......core.graph import TileableGraph, TileableGraphBuilder, ChunkGraphBuilder
from ......serialization import serialize
from ......tests.core import require_ray
from ......tests.core import require_ray, mock
from ......utils import lazy_import, get_chunk_params
from .....context import ThreadedServiceContext
from ....core import new_task_id
from ..context import RayRemoteObjectManager, _RayRemoteObjectContext
from ..context import (
RayExecutionContext,
RayRemoteObjectManager,
_RayRemoteObjectContext,
)
from ..executor import execute_subtask
from ..fetcher import RayFetcher

Expand Down Expand Up @@ -119,3 +124,27 @@ async def bar(self, a, b):
context.destroy_remote_object(name)
with pytest.raises(KeyError):
remote_object.foo(3, 4)

class MyException(Exception):
pass

class _ErrorRemoteObject:
def __init__(self):
raise MyException()

with pytest.raises(MyException):
context.create_remote_object(name, _ErrorRemoteObject)


@require_ray
def test_get_chunks_result(ray_start_regular_shared2):
value = 123
o = ray.put(value)

def fake_init(self):
pass

with mock.patch.object(ThreadedServiceContext, "__init__", new=fake_init):
context = RayExecutionContext({"abc": o}, None)
r = context.get_chunks_result(["abc"])
assert r == [value]
Loading

0 comments on commit 6c9c4b0

Please sign in to comment.