Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize mars supervisor scheduling #2325

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions mars/services/lifecycle/supervisor/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio

from collections import defaultdict
from typing import Dict, List, Optional

Expand Down Expand Up @@ -80,7 +82,8 @@ def _get_remove_chunk_keys(self, chunk_keys: List[str]):

async def decref_chunks(self, chunk_keys: List[str]):
to_remove_chunk_keys = self._get_remove_chunk_keys(chunk_keys)
return self._remove_chunks(to_remove_chunk_keys)
# make _remove_chunks release actor lock so that multiple `decref_chunks` can run concurrently.
yield self._remove_chunks(to_remove_chunk_keys)

async def _remove_chunks(self, to_remove_chunk_keys: List[str]):
if not to_remove_chunk_keys:
Expand Down Expand Up @@ -117,8 +120,8 @@ async def _remove_chunks(self, to_remove_chunk_keys: List[str]):
storage_api = await StorageAPI.create(self._session_id, band[0], band[1])
storage_api_to_deletes[storage_api].append(
storage_api.delete.delay(key, error='ignore'))
for storage_api, deletes in storage_api_to_deletes.items():
await storage_api.delete.batch(*deletes)
await asyncio.gather(*[storage_api.delete.batch(*deletes)
for storage_api, deletes in storage_api_to_deletes.items()])

# delete meta
delete_metas = []
Expand Down Expand Up @@ -157,7 +160,7 @@ async def decref_tileables(self, tileable_keys: List[str]):
self._tileable_ref_counts[tileable_key] -= 1

decref_chunk_keys.extend(self._tileable_key_to_chunk_keys[tileable_key])
return await self.decref_chunks(decref_chunk_keys)
yield self.decref_chunks(decref_chunk_keys)

def get_tileable_ref_counts(self, tileable_keys: List[str]) -> List[int]:
return [self._tileable_ref_counts[tileable_key]
Expand Down
8 changes: 8 additions & 0 deletions mars/services/task/analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from collections import deque
from typing import Dict, List, Tuple, Type, Union

Expand All @@ -24,6 +26,8 @@
from .assigner import AbstractGraphAssigner, GraphAssigner
from .fusion import Fusion

logger = logging.getLogger(__name__)


class GraphAnalyzer:
def __init__(self,
Expand Down Expand Up @@ -259,11 +263,15 @@ def gen_subtask_graph(self) -> SubtaskGraph:
# assign expect workers
cur_assigns = {op.key: self._to_band(op.expect_worker)
for op in start_ops if op.expect_worker is not None}
logger.info('Start to assign %s start chunks.', len(start_ops))
chunk_to_bands = assigner.assign(cur_assigns=cur_assigns)
logger.info('Assigned %s start chunks.', len(start_ops))

# fuse node
if self._fuse_enabled:
logger.info('Start to fuse chunks.')
chunk_to_bands = self._fuse(chunk_to_bands)
logger.info('Fused chunks.')

subtask_graph = SubtaskGraph()
chunk_to_priorities = dict()
Expand Down
11 changes: 5 additions & 6 deletions mars/services/task/analyzer/assigner.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,25 +114,24 @@ def _calc_band_assign_limits(self,
pos = (pos + 1) % len(counts)
return dict(zip(bands, counts))

def _assign_by_dfs(self,
def _assign_by_bfs(self,
start: ChunkData,
band: BandType,
initial_sizes: Dict[BandType, int],
spread_limits: Dict[BandType, float],
key_to_assign: Set[str],
assigned_record: Dict[str, int]):
"""
Assign initial nodes using breath-first search givin initial sizes and
Assign initial nodes using breath-first search given initial sizes and
limitations of spread range.
"""
if initial_sizes[band] <= 0:
return

graph = self._chunk_graph
if self._undirected_chunk_graph is None:
undirected_chunk_graph = graph.build_undirected()
else:
undirected_chunk_graph = self._undirected_chunk_graph
self._undirected_chunk_graph = graph.build_undirected()
undirected_chunk_graph = self._undirected_chunk_graph

assigned = 0
spread_range = 0
Expand Down Expand Up @@ -187,7 +186,7 @@ def assign(self, cur_assigns: Dict[str, str] = None) -> Dict[ChunkData, BandType
cur = sorted_candidates.pop()
while cur.op.key in cur_assigns:
cur = sorted_candidates.pop()
self._assign_by_dfs(cur, band, band_quotas, spread_ranges,
self._assign_by_bfs(cur, band, band_quotas, spread_ranges,
op_keys, cur_assigns)

key_to_assign = \
Expand Down
8 changes: 7 additions & 1 deletion mars/services/task/supervisor/preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@


import asyncio
import logging
from functools import partial
from typing import Callable, Dict, List, Iterable, Set, Tuple

Expand All @@ -29,6 +30,8 @@
from ..analyzer import GraphAnalyzer
from ..core import Task

logger = logging.getLogger(__name__)


class CancellableTiler(Tiler):
def __init__(self,
Expand Down Expand Up @@ -147,9 +150,12 @@ def tile(self, tileable_graph: TileableGraph) -> Iterable[ChunkGraph]:
def analyze(self,
chunk_graph: ChunkGraph,
available_bands: Dict[BandType, int]) -> SubtaskGraph:
logger.info('Start to gen subtask graph.')
task = self._task
analyzer = GraphAnalyzer(chunk_graph, available_bands, task)
return analyzer.gen_subtask_graph()
graph = analyzer.gen_subtask_graph()
logger.info('Generated subtask graph of %s subtasks.', len(graph))
return graph

def _get_done(self):
return self._done.is_set()
Expand Down
5 changes: 4 additions & 1 deletion mars/services/task/supervisor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,10 @@ async def set_subtask_result(self, subtask_result: SubtaskResult):
stage_processor.subtask_temp_result[subtask] = subtask_result
if subtask_result.status.is_done:
try:
await self._decref_input_subtasks(subtask, stage_processor.subtask_graph)
# Since every worker will call supervisor to set subtask result,
# we need to release actor lock to make `decref_chunks` parallel to avoid blocking
# other `set_subtask_result` calls.
yield self._decref_input_subtasks(subtask, stage_processor.subtask_graph)
except: # noqa: E722 # nosec # pylint: disable=bare-except # pragma: no cover
_, err, tb = sys.exc_info()
if subtask_result.status not in (SubtaskStatus.errored, SubtaskStatus.cancelled):
Expand Down