Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Scheduling] Expand the ability of resource evaluator #2997

Merged
merged 5 commits into from
May 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions mars/services/task/execution/mars/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def __init__(
lifecycle_api: LifecycleAPI,
scheduling_api: SchedulingAPI,
meta_api: MetaAPI,
resource_evaluator: ResourceEvaluator,
):
self._config = config
self._task = task
Expand All @@ -94,6 +95,9 @@ def __init__(
self._subtask_decref_events = dict()
self._meta_updated_tileables = set()

# Evaluate and initialize subtasks required resource.
self._resource_evaluator = resource_evaluator

@classmethod
async def create(
cls,
Expand All @@ -111,6 +115,12 @@ async def create(
cluster_api, lifecycle_api, scheduling_api, meta_api = await cls._get_apis(
session_id, address
)
resource_evaluator = await ResourceEvaluator.create(
config.get_execution_config(),
session_id=task.session_id,
task_id=task.task_id,
cluster_api=cluster_api,
)
return cls(
config,
task,
Expand All @@ -119,6 +129,7 @@ async def create(
lifecycle_api,
scheduling_api,
meta_api,
resource_evaluator,
)

@classmethod
Expand Down Expand Up @@ -159,9 +170,7 @@ async def execute_subtask_graph(
self._meta_api,
)
await self._incref_stage(stage_processor)
# Evaluate and initialize subtasks required resource.
resource_evaluator = ResourceEvaluator(stage_processor)
resource_evaluator.evaluate()
await self._resource_evaluator.evaluate(stage_processor)
self._stage_processors.append(stage_processor)
self._cur_stage_processor = stage_processor
# get the tiled progress for current stage
Expand All @@ -184,6 +193,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
if error_or_cancelled:
# revert result incref if error or cancelled
await self._decref_result_tileables()
await self._resource_evaluator.report()

async def get_available_band_resources(self) -> Dict[BandType, Resource]:
async for bands in self._cluster_api.watch_all_bands():
Expand Down
95 changes: 76 additions & 19 deletions mars/services/task/execution/mars/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,85 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from typing import Dict, Type, Any

from .....resource import Resource
from .stage import TaskStageProcessor


class ResourceEvaluator:
_name_to_resource_evaluator: Dict[str, Type["ResourceEvaluator"]] = {}


def register_resource_evaluator(evaluator_cls: Type["ResourceEvaluator"]):
_name_to_resource_evaluator[evaluator_cls.name] = evaluator_cls
return evaluator_cls


def init_default_resource_for_subtask(subtask_graph: "SubtaskGraph"): # noqa: F821
for subtask in subtask_graph.iter_nodes():
is_gpu = any(c.op.gpu for c in subtask.chunk_graph)
subtask.required_resource = (
Resource(num_gpus=1) if is_gpu else Resource(num_cpus=1)
)


class ResourceEvaluator(ABC):
"""
Evaluate and initialize the required resource of subtasks by different
configurations, e.g. fixed value by default, or some recommended values
through external services like an HBO service which could calculate a
accurate value by the running history of tasks.
Resource evaluator is used to estimate and set resources required by
subtasks. It can be an internal service or an external service. If it
is an internal service, we can set default of adjustable resources for
subtasks. If it is an external service, we should report the running
result of the task to the external service, so that it can accurately
predict the required resources of subtasks based on the historical
running information, we call it HBO.

Best practice
----------
You can follow the steps below to add a new resource evaluator:
* Inherit `ResourceEvaluator` and implement `create`, `evaluate`
and `report` methods. The `create` method is to create a new
resource evaluator instance. The `evaluate` method is to estimate
and set required resources for the subtasks of a task stage. And
this method must be implemented. The `report` method is to report
the running information and result of the task. And this method
does not have to be implemented.

* Add default configs of the new evaluator needed in `base_config.xml`
or its descendant files.

* Set the `resource_evaluator` to choose a resource evaluator in
`base_config.xml` when running a mars job.
"""

def __init__(self, stage_processor: TaskStageProcessor):
self._stage_processor = stage_processor
self._subtask_graph = stage_processor.subtask_graph

def evaluate(self):
"""Here we could implement different acquisitions by state processor
configurations.
"""
for subtask in self._subtask_graph.iter_nodes():
is_gpu = any(c.op.gpu for c in subtask.chunk_graph)
subtask.required_resource = (
Resource(num_gpus=1) if is_gpu else Resource(num_cpus=1)
)
name = None

@classmethod
@abstractmethod
async def create(cls, config: Dict[str, Any], **kwargs) -> "ResourceEvaluator":
name = config.get("resource_evaluator", "default")
evaluator_config = config.get(name, {})
evaluator_cls = _name_to_resource_evaluator[name]
return await evaluator_cls.create(evaluator_config, **kwargs)

@abstractmethod
async def evaluate(self, stage_processor: "TaskStageProcessor"): # noqa: F821
"""Called before executing a task stage."""

@abstractmethod
async def report(self):
"""Called after executing a task."""


@register_resource_evaluator
class DefaultEvaluator(ResourceEvaluator):
name = "default"

@classmethod
async def create(cls, config, **kwargs) -> "ResourceEvaluator":
return cls()

async def evaluate(self, stage_processor: "TaskStageProcessor"): # noqa: F821
init_default_resource_for_subtask(stage_processor.subtask_graph)

async def report(self):
pass
13 changes: 13 additions & 0 deletions mars/services/task/execution/mars/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
100 changes: 100 additions & 0 deletions mars/services/task/execution/mars/tests/test_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np
import pytest

from typing import Dict, Any

from ...... import dataframe as md
from ...... import tensor as mt
from .... import Task
from ......config import Config
from ......core import Tileable, TileableGraph, ChunkGraphBuilder
from ......resource import Resource
from ....analyzer import GraphAnalyzer
from ..resource import ResourceEvaluator, register_resource_evaluator, DefaultEvaluator
from ..stage import TaskStageProcessor


@register_resource_evaluator
class MockedEvaluator(ResourceEvaluator):
name = "mock"

def __init__(self, config, **kwargs):
self._config = config

@classmethod
async def create(cls, config: Dict[str, Any], **kwargs) -> "ResourceEvaluator":
return cls(config, **kwargs)

async def evaluate(self, stage_processor: "TaskStageProcessor"):
pass

async def report(self):
pass


def _build_chunk_graph(tileable_graph: TileableGraph):
return next(ChunkGraphBuilder(tileable_graph).build())


async def _gen_stage_processor(t):
tileable_graph = t.build_graph(tile=False)
chunk_graph = _build_chunk_graph(tileable_graph)
bands = [(f"address_{i}", "numa-0") for i in range(4)]
band_resource = dict((band, Resource(num_cpus=1)) for band in bands)
task = Task("mock_task", "mock_session", tileable_graph)
analyzer = GraphAnalyzer(chunk_graph, band_resource, task, Config(), dict())
subtask_graph = analyzer.gen_subtask_graph()
stage_processor = TaskStageProcessor(
"stage_id", task, chunk_graph, subtask_graph, bands, None, None, None
)
return stage_processor


async def _test_default_evaluator(config: Dict[str, Any], t: Tileable):
resource_evaluator = await ResourceEvaluator.create(config)
assert resource_evaluator is not None
assert isinstance(resource_evaluator, DefaultEvaluator)
stage_processor = await _gen_stage_processor(t)
await resource_evaluator.evaluate(stage_processor)
for subtask in stage_processor.subtask_graph.iter_nodes():
is_gpu = any(c.op.gpu for c in subtask.chunk_graph)
assert (
subtask.required_resource == Resource(num_gpus=1)
if is_gpu
else Resource(num_cpus=1)
)
assert await resource_evaluator.report() is None


@pytest.mark.asyncio
async def test_resource_evaluator():
# test mocked resource evaluator
resource_evaluator = await ResourceEvaluator.create({"resource_evaluator": "mock"})
assert resource_evaluator is not None
assert isinstance(resource_evaluator, MockedEvaluator)

# test default resource evaluator
t = mt.ones((10, 10), chunk_size=5) + 1
await _test_default_evaluator({}, t)
await _test_default_evaluator({"resource_evaluator": "default"}, t)
df = md.DataFrame(
np.random.randint(0, 100, size=(100_000, 4)),
columns=list("ABCD"),
chunk_size=1000,
)
df = df[df["A"] > 50]
await _test_default_evaluator({}, df)