diff --git a/mars/services/task/execution/mars/executor.py b/mars/services/task/execution/mars/executor.py index 7ee7f6ae43..d600a93634 100644 --- a/mars/services/task/execution/mars/executor.py +++ b/mars/services/task/execution/mars/executor.py @@ -73,6 +73,7 @@ def __init__( lifecycle_api: LifecycleAPI, scheduling_api: SchedulingAPI, meta_api: MetaAPI, + resource_evaluator: ResourceEvaluator, ): self._config = config self._task = task @@ -94,6 +95,9 @@ def __init__( self._subtask_decref_events = dict() self._meta_updated_tileables = set() + # Evaluate and initialize subtasks required resource. + self._resource_evaluator = resource_evaluator + @classmethod async def create( cls, @@ -111,6 +115,12 @@ async def create( cluster_api, lifecycle_api, scheduling_api, meta_api = await cls._get_apis( session_id, address ) + resource_evaluator = await ResourceEvaluator.create( + config.get_execution_config(), + session_id=task.session_id, + task_id=task.task_id, + cluster_api=cluster_api, + ) return cls( config, task, @@ -119,6 +129,7 @@ async def create( lifecycle_api, scheduling_api, meta_api, + resource_evaluator, ) @classmethod @@ -159,9 +170,7 @@ async def execute_subtask_graph( self._meta_api, ) await self._incref_stage(stage_processor) - # Evaluate and initialize subtasks required resource. - resource_evaluator = ResourceEvaluator(stage_processor) - resource_evaluator.evaluate() + await self._resource_evaluator.evaluate(stage_processor) self._stage_processors.append(stage_processor) self._cur_stage_processor = stage_processor # get the tiled progress for current stage @@ -184,6 +193,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): if error_or_cancelled: # revert result incref if error or cancelled await self._decref_result_tileables() + await self._resource_evaluator.report() async def get_available_band_resources(self) -> Dict[BandType, Resource]: async for bands in self._cluster_api.watch_all_bands(): diff --git a/mars/services/task/execution/mars/resource.py b/mars/services/task/execution/mars/resource.py index d31e806986..4fe385438b 100644 --- a/mars/services/task/execution/mars/resource.py +++ b/mars/services/task/execution/mars/resource.py @@ -12,28 +12,85 @@ # See the License for the specific language governing permissions and # limitations under the License. +from abc import ABC, abstractmethod +from typing import Dict, Type, Any + from .....resource import Resource -from .stage import TaskStageProcessor -class ResourceEvaluator: +_name_to_resource_evaluator: Dict[str, Type["ResourceEvaluator"]] = {} + + +def register_resource_evaluator(evaluator_cls: Type["ResourceEvaluator"]): + _name_to_resource_evaluator[evaluator_cls.name] = evaluator_cls + return evaluator_cls + + +def init_default_resource_for_subtask(subtask_graph: "SubtaskGraph"): # noqa: F821 + for subtask in subtask_graph.iter_nodes(): + is_gpu = any(c.op.gpu for c in subtask.chunk_graph) + subtask.required_resource = ( + Resource(num_gpus=1) if is_gpu else Resource(num_cpus=1) + ) + + +class ResourceEvaluator(ABC): """ - Evaluate and initialize the required resource of subtasks by different - configurations, e.g. fixed value by default, or some recommended values - through external services like an HBO service which could calculate a - accurate value by the running history of tasks. + Resource evaluator is used to estimate and set resources required by + subtasks. It can be an internal service or an external service. If it + is an internal service, we can set default of adjustable resources for + subtasks. If it is an external service, we should report the running + result of the task to the external service, so that it can accurately + predict the required resources of subtasks based on the historical + running information, we call it HBO. + + Best practice + ---------- + You can follow the steps below to add a new resource evaluator: + * Inherit `ResourceEvaluator` and implement `create`, `evaluate` + and `report` methods. The `create` method is to create a new + resource evaluator instance. The `evaluate` method is to estimate + and set required resources for the subtasks of a task stage. And + this method must be implemented. The `report` method is to report + the running information and result of the task. And this method + does not have to be implemented. + + * Add default configs of the new evaluator needed in `base_config.xml` + or its descendant files. + + * Set the `resource_evaluator` to choose a resource evaluator in + `base_config.xml` when running a mars job. """ - def __init__(self, stage_processor: TaskStageProcessor): - self._stage_processor = stage_processor - self._subtask_graph = stage_processor.subtask_graph - - def evaluate(self): - """Here we could implement different acquisitions by state processor - configurations. - """ - for subtask in self._subtask_graph.iter_nodes(): - is_gpu = any(c.op.gpu for c in subtask.chunk_graph) - subtask.required_resource = ( - Resource(num_gpus=1) if is_gpu else Resource(num_cpus=1) - ) + name = None + + @classmethod + @abstractmethod + async def create(cls, config: Dict[str, Any], **kwargs) -> "ResourceEvaluator": + name = config.get("resource_evaluator", "default") + evaluator_config = config.get(name, {}) + evaluator_cls = _name_to_resource_evaluator[name] + return await evaluator_cls.create(evaluator_config, **kwargs) + + @abstractmethod + async def evaluate(self, stage_processor: "TaskStageProcessor"): # noqa: F821 + """Called before executing a task stage.""" + + @abstractmethod + async def report(self): + """Called after executing a task.""" + + +@register_resource_evaluator +class DefaultEvaluator(ResourceEvaluator): + name = "default" + + @classmethod + async def create(cls, config, **kwargs) -> "ResourceEvaluator": + return cls() + + async def evaluate(self, stage_processor: "TaskStageProcessor"): # noqa: F821 + init_default_resource_for_subtask(stage_processor.subtask_graph) + + async def report(self): + pass diff --git a/mars/services/task/execution/mars/tests/__init__.py b/mars/services/task/execution/mars/tests/__init__.py new file mode 100644 index 0000000000..c71e83c08e --- /dev/null +++ b/mars/services/task/execution/mars/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/mars/services/task/execution/mars/tests/test_resource.py b/mars/services/task/execution/mars/tests/test_resource.py new file mode 100644 index 0000000000..4ed92eb4ac --- /dev/null +++ b/mars/services/task/execution/mars/tests/test_resource.py @@ -0,0 +1,100 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import pytest + +from typing import Dict, Any + +from ...... import dataframe as md +from ...... import tensor as mt +from .... import Task +from ......config import Config +from ......core import Tileable, TileableGraph, ChunkGraphBuilder +from ......resource import Resource +from ....analyzer import GraphAnalyzer +from ..resource import ResourceEvaluator, register_resource_evaluator, DefaultEvaluator +from ..stage import TaskStageProcessor + + +@register_resource_evaluator +class MockedEvaluator(ResourceEvaluator): + name = "mock" + + def __init__(self, config, **kwargs): + self._config = config + + @classmethod + async def create(cls, config: Dict[str, Any], **kwargs) -> "ResourceEvaluator": + return cls(config, **kwargs) + + async def evaluate(self, stage_processor: "TaskStageProcessor"): + pass + + async def report(self): + pass + + +def _build_chunk_graph(tileable_graph: TileableGraph): + return next(ChunkGraphBuilder(tileable_graph).build()) + + +async def _gen_stage_processor(t): + tileable_graph = t.build_graph(tile=False) + chunk_graph = _build_chunk_graph(tileable_graph) + bands = [(f"address_{i}", "numa-0") for i in range(4)] + band_resource = dict((band, Resource(num_cpus=1)) for band in bands) + task = Task("mock_task", "mock_session", tileable_graph) + analyzer = GraphAnalyzer(chunk_graph, band_resource, task, Config(), dict()) + subtask_graph = analyzer.gen_subtask_graph() + stage_processor = TaskStageProcessor( + "stage_id", task, chunk_graph, subtask_graph, bands, None, None, None + ) + return stage_processor + + +async def _test_default_evaluator(config: Dict[str, Any], t: Tileable): + resource_evaluator = await ResourceEvaluator.create(config) + assert resource_evaluator is not None + assert isinstance(resource_evaluator, DefaultEvaluator) + stage_processor = await _gen_stage_processor(t) + await resource_evaluator.evaluate(stage_processor) + for subtask in stage_processor.subtask_graph.iter_nodes(): + is_gpu = any(c.op.gpu for c in subtask.chunk_graph) + assert ( + subtask.required_resource == Resource(num_gpus=1) + if is_gpu + else Resource(num_cpus=1) + ) + assert await resource_evaluator.report() is None + + +@pytest.mark.asyncio +async def test_resource_evaluator(): + # test mocked resource evaluator + resource_evaluator = await ResourceEvaluator.create({"resource_evaluator": "mock"}) + assert resource_evaluator is not None + assert isinstance(resource_evaluator, MockedEvaluator) + + # test default resource evaluator + t = mt.ones((10, 10), chunk_size=5) + 1 + await _test_default_evaluator({}, t) + await _test_default_evaluator({"resource_evaluator": "default"}, t) + df = md.DataFrame( + np.random.randint(0, 100, size=(100_000, 4)), + columns=list("ABCD"), + chunk_size=1000, + ) + df = df[df["A"] > 50] + await _test_default_evaluator({}, df)