Skip to content

Commit

Permalink
Add basic profiling support for supervisor (#2586)
Browse files Browse the repository at this point in the history
  • Loading branch information
fyrestone authored Dec 2, 2021
1 parent 578cf63 commit ea8dc9f
Show file tree
Hide file tree
Showing 20 changed files with 516 additions and 84 deletions.
15 changes: 14 additions & 1 deletion mars/_utils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import importlib
import os
import pickle
import pkgutil
import time
import types
import uuid
from datetime import date, datetime, timedelta, tzinfo
Expand Down Expand Up @@ -411,5 +412,17 @@ cpdef long long ceildiv(long long x, long long y) nogil:
return x // y + (x % y != 0)


cdef class Timer:
cdef object _start
cdef readonly object duration

def __enter__(self):
self._start = time.time()
return self

def __exit__(self, exc_type, exc_value, traceback):
self.duration = time.time() - self._start


__all__ = ['to_str', 'to_binary', 'to_text', 'TypeDispatcher', 'tokenize', 'tokenize_int',
'register_tokenizer', 'insert_reversed_tuple', 'ceildiv']
'register_tokenizer', 'insert_reversed_tuple', 'ceildiv', 'Timer']
34 changes: 30 additions & 4 deletions mars/deploy/oscar/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import concurrent.futures
import itertools
import logging
import json
import random
import string
import threading
Expand Down Expand Up @@ -71,15 +72,22 @@ class Progress:
value: float = 0.0


@dataclass
class Profiling:
result: dict = None


class ExecutionInfo:
def __init__(
self,
aio_task: asyncio.Task,
progress: Progress,
profiling: Profiling,
loop: asyncio.AbstractEventLoop,
):
self._aio_task = aio_task
self._progress = progress
self._profiling = profiling
self._loop = loop

self._future_local = threading.local()
Expand Down Expand Up @@ -108,6 +116,9 @@ def aio_task(self):
def progress(self) -> float:
return self._progress.value

def profiling_result(self) -> dict:
return self._profiling.result

def result(self, timeout=None):
self._ensure_future()
return self._future_local.future.result(timeout=timeout)
Expand Down Expand Up @@ -805,7 +816,11 @@ async def init(
return await cls._init(address, session_id, new=new, timeout=timeout)

async def _run_in_background(
self, tileables: list, task_id: str, progress: Progress
self,
tileables: list,
task_id: str,
progress: Progress,
profiling: Profiling,
):
with enter_mode(build=True, kernel=True):
# wait for task to finish
Expand Down Expand Up @@ -850,6 +865,13 @@ async def _run_in_background(
raise TimeoutError(
f"Task({task_id}) running time > {self.timeout}"
)
profiling.result = task_result.profiling
if task_result.profiling:
logger.warning(
"Profile task %s execution result:\n%s",
task_id,
json.dumps(task_result.profiling, indent=4),
)
if task_result.error:
raise task_result.error.with_traceback(task_result.traceback)
if cancelled:
Expand Down Expand Up @@ -886,11 +908,12 @@ async def execute(self, *tileables, **kwargs) -> ExecutionInfo:
)

progress = Progress()
profiling = Profiling()
# create asyncio.Task
aio_task = asyncio.create_task(
self._run_in_background(tileables, task_id, progress)
self._run_in_background(tileables, task_id, progress, profiling)
)
return ExecutionInfo(aio_task, progress, asyncio.get_running_loop())
return ExecutionInfo(aio_task, progress, profiling, asyncio.get_running_loop())

def _get_to_fetch_tileable(
self, tileable: TileableType
Expand Down Expand Up @@ -1537,7 +1560,10 @@ async def driver():
driver(), execution_info.loop
).result()
new_execution_info = ExecutionInfo(
new_aio_task, execution_info._progress, execution_info.loop
new_aio_task,
execution_info._progress,
execution_info._profiling,
execution_info.loop,
)
return new_execution_info

Expand Down
45 changes: 41 additions & 4 deletions mars/deploy/oscar/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from ....storage import StorageLevel
from ....services.storage import StorageAPI
from ....tensor.arithmetic.add import TensorAdd
from ....tests.core import check_dict_structure_same
from ..local import new_cluster
from ..service import load_config
from ..session import (
Expand Down Expand Up @@ -67,6 +68,22 @@
os.path.dirname(__file__), "local_test_with_third_parity_modules_config.yml"
)

EXPECT_PROFILING_STRUCTURE = {
"supervisor": {
"general": {
"optimize": 0.0005879402160644531,
"incref_fetch_tileables": 0.0010840892791748047,
"stage_*": {
"tile": 0.008243083953857422,
"gen_subtask_graph": 0.012202978134155273,
"run": 0.27870702743530273,
"total": 0.30318617820739746,
},
"total": 0.30951380729675293,
},
"serialization": {},
}
}

params = ["default"]
if vineyard is not None:
Expand Down Expand Up @@ -147,8 +164,15 @@ async def test_vineyard_operators(create_cluster):
pd.testing.assert_frame_equal(df, raw)


@pytest.mark.parametrize(
"config",
[
[{"enable_profiling": True}, EXPECT_PROFILING_STRUCTURE],
[{}, {}],
],
)
@pytest.mark.asyncio
async def test_execute(create_cluster):
async def test_execute(create_cluster, config):
session = get_default_async_session()
assert session.address is not None
assert session.session_id is not None
Expand All @@ -157,8 +181,14 @@ async def test_execute(create_cluster):
a = mt.tensor(raw, chunk_size=5)
b = a + 1

info = await session.execute(b)
extra_config, expect_profiling_structure = config

info = await session.execute(b, extra_config=extra_config)
await info
if extra_config:
check_dict_structure_same(info.profiling_result(), expect_profiling_structure)
else:
assert not info.profiling_result()
assert info.result() is None
assert info.exception() is None
assert info.progress() == 1
Expand Down Expand Up @@ -296,16 +326,23 @@ def _my_func():
await session.destroy()


@pytest.mark.parametrize(
"config",
[
[{"enable_profiling": True}, EXPECT_PROFILING_STRUCTURE],
[{}, {}],
],
)
@pytest.mark.asyncio
async def test_web_session(create_cluster):
async def test_web_session(create_cluster, config):
client = create_cluster[0]
session_id = str(uuid.uuid4())
web_address = client.web_address
session = await AsyncSession.init(web_address, session_id)
assert await session.get_web_endpoint() == web_address
session.as_default()
assert isinstance(session._isolated_session, _IsolatedWebSession)
await test_execute(client)
await test_execute(client, config)
await test_iterative_tiling(client)
AsyncSession.reset_default()
await session.destroy()
Expand Down
43 changes: 39 additions & 4 deletions mars/deploy/oscar/tests/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,27 @@
os.path.dirname(__file__), "ray_test_with_third_parity_modules_config.yml"
)

EXPECT_PROFILING_STRUCTURE = {
"supervisor": {
"general": {
"optimize": 0.0005879402160644531,
"incref_fetch_tileables": 0.0010840892791748047,
"stage_*": {
"tile": 0.008243083953857422,
"gen_subtask_graph": 0.012202978134155273,
"run": 0.27870702743530273,
"total": 0.30318617820739746,
},
"total": 0.30951380729675293,
},
"serialization": {
"serialize": 0.014928340911865234,
"deserialize": 0.0011813640594482422,
"total": 0.016109704971313477,
},
}
}


@pytest.fixture
async def create_cluster(request):
Expand All @@ -74,9 +95,16 @@ async def create_cluster(request):


@require_ray
@pytest.mark.parametrize(
"config",
[
[{"enable_profiling": True}, EXPECT_PROFILING_STRUCTURE],
[{}, {}],
],
)
@pytest.mark.asyncio
async def test_execute(ray_large_cluster, create_cluster):
await test_local.test_execute(create_cluster)
async def test_execute(ray_large_cluster, create_cluster, config):
await test_local.test_execute(create_cluster, config)


@require_ray
Expand Down Expand Up @@ -194,10 +222,17 @@ async def test_optional_supervisor_node(ray_large_cluster, test_option):


@require_ray
@pytest.mark.parametrize(
"config",
[
[{"enable_profiling": True}, EXPECT_PROFILING_STRUCTURE],
[{}, {}],
],
)
@pytest.mark.asyncio
async def test_web_session(ray_large_cluster, create_cluster):
async def test_web_session(ray_large_cluster, create_cluster, config):
client = create_cluster[0]
await test_local.test_web_session(create_cluster)
await test_local.test_web_session(create_cluster, config)
web_address = client.web_address
assert await ray.remote(_run_web_session).remote(web_address)
assert await ray.remote(_sync_web_session_test).remote(web_address)
Expand Down
13 changes: 11 additions & 2 deletions mars/oscar/backends/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
CancelMessage,
ControlMessage,
ControlMessageType,
ProfilingContext,
)
from .router import Router

Expand Down Expand Up @@ -155,10 +156,18 @@ async def actor_ref(self, *args, **kwargs):
return self._process_result_message(result)

async def send(
self, actor_ref: ActorRef, message: Tuple, wait_response: bool = True
self,
actor_ref: ActorRef,
message: Tuple,
wait_response: bool = True,
profiling_context: ProfilingContext = None,
):
message = SendMessage(
new_message_id(), actor_ref, message, protocol=DEFAULT_PROTOCOL
new_message_id(),
actor_ref,
message,
protocol=DEFAULT_PROTOCOL,
profiling_context=profiling_context,
)

with debug_async_timeout(
Expand Down
26 changes: 23 additions & 3 deletions mars/oscar/backends/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,21 @@ class MessageTraceItem:
method: str


@dataslots
@dataclass
class ProfilingContext:
task_id: str


class _MessageBase(ABC):
__slots__ = "protocol", "message_id", "message_trace"
__slots__ = "protocol", "message_id", "message_trace", "profiling_context"

def __init__(
self,
message_id: bytes,
protocol: int = None,
message_trace: List[MessageTraceItem] = None,
profiling_context: ProfilingContext = None,
):
self.message_id = message_id
if protocol is None:
Expand All @@ -91,6 +98,7 @@ def __init__(
# `A` will find that id:1 already exists in inbox,
# thus deadlock detected.
self.message_trace = message_trace
self.profiling_context = profiling_context

@classproperty
@abstractmethod
Expand Down Expand Up @@ -144,8 +152,14 @@ def __init__(
result: Any,
protocol: int = None,
message_trace: List[MessageTraceItem] = None,
profiling_context: ProfilingContext = None,
):
super().__init__(message_id, protocol=protocol, message_trace=message_trace)
super().__init__(
message_id,
protocol=protocol,
message_trace=message_trace,
profiling_context=profiling_context,
)
self.result = result

@classproperty
Expand Down Expand Up @@ -276,8 +290,14 @@ def __init__(
content: Any,
protocol: int = None,
message_trace: List[MessageTraceItem] = None,
profiling_context: ProfilingContext = None,
):
super().__init__(message_id, protocol=protocol, message_trace=message_trace)
super().__init__(
message_id,
protocol=protocol,
message_trace=message_trace,
profiling_context=profiling_context,
)
self.actor_ref = actor_ref
self.content = content

Expand Down
10 changes: 8 additions & 2 deletions mars/oscar/backends/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,10 @@ async def send(self, message: SendMessage) -> result_message_type:
with self._run_coro(message.message_id, coro) as future:
result = await future
processor.result = ResultMessage(
message.message_id, result, protocol=message.protocol
message.message_id,
result,
protocol=message.protocol,
profiling_context=message.profiling_context,
)
return processor.result

Expand All @@ -538,7 +541,10 @@ async def tell(self, message: TellMessage) -> result_message_type:
asyncio.create_task(call)
await asyncio.sleep(0)
processor.result = ResultMessage(
message.message_id, None, protocol=message.protocol
message.message_id,
None,
protocol=message.protocol,
profiling_context=message.profiling_context,
)
return processor.result

Expand Down
Loading

0 comments on commit ea8dc9f

Please sign in to comment.