From 1525fb06d82d0eee6c8d43cb07ae3ef63f7b1321 Mon Sep 17 00:00:00 2001 From: chaokunyang Date: Wed, 11 Aug 2021 18:12:02 +0800 Subject: [PATCH 1/5] make subtask graph analyze async --- mars/services/task/supervisor/preprocessor.py | 8 +++++++- mars/services/task/supervisor/processor.py | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/mars/services/task/supervisor/preprocessor.py b/mars/services/task/supervisor/preprocessor.py index c1caef4d6b..170dfa623b 100644 --- a/mars/services/task/supervisor/preprocessor.py +++ b/mars/services/task/supervisor/preprocessor.py @@ -14,6 +14,7 @@ import asyncio +import logging from functools import partial from typing import Callable, Dict, List, Iterable, Set, Tuple @@ -29,6 +30,8 @@ from ..analyzer import GraphAnalyzer from ..core import Task +logger = logging.getLogger(__name__) + class CancellableTiler(Tiler): def __init__(self, @@ -147,9 +150,12 @@ def tile(self, tileable_graph: TileableGraph) -> Iterable[ChunkGraph]: def analyze(self, chunk_graph: ChunkGraph, available_bands: Dict[BandType, int]) -> SubtaskGraph: + logger.info('Start to gen subtask graph.') task = self._task analyzer = GraphAnalyzer(chunk_graph, available_bands, task) - return analyzer.gen_subtask_graph() + graph = analyzer.gen_subtask_graph() + logger.info('Generated subtask graph of %s subtasks.', len(graph)) + return graph def _get_done(self): return self._done.is_set() diff --git a/mars/services/task/supervisor/processor.py b/mars/services/task/supervisor/processor.py index d87cd852eb..73e353ce84 100644 --- a/mars/services/task/supervisor/processor.py +++ b/mars/services/task/supervisor/processor.py @@ -254,8 +254,8 @@ async def get_next_stage_processor(self) \ # gen subtask graph available_bands = await self._get_available_band_slots() - subtask_graph = self._preprocessor.analyze( - chunk_graph, available_bands) + subtask_graph = await asyncio.to_thread(self._preprocessor.analyze, + chunk_graph, available_bands) stage_processor = TaskStageProcessor( new_task_id(), self._task, chunk_graph, subtask_graph, list(available_bands), self._get_chunk_optimization_records(), From 690acbd9764b7523c4a9806ecb381e78f662f1de Mon Sep 17 00:00:00 2001 From: chaokunyang Date: Wed, 11 Aug 2021 18:17:19 +0800 Subject: [PATCH 2/5] make decref chunks parallel --- mars/services/lifecycle/supervisor/tracker.py | 9 ++++++--- mars/services/task/supervisor/processor.py | 5 ++++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/mars/services/lifecycle/supervisor/tracker.py b/mars/services/lifecycle/supervisor/tracker.py index 7cd461d696..391ca4526f 100644 --- a/mars/services/lifecycle/supervisor/tracker.py +++ b/mars/services/lifecycle/supervisor/tracker.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio + from collections import defaultdict from typing import Dict, List, Optional @@ -80,7 +82,8 @@ def _get_remove_chunk_keys(self, chunk_keys: List[str]): async def decref_chunks(self, chunk_keys: List[str]): to_remove_chunk_keys = self._get_remove_chunk_keys(chunk_keys) - return self._remove_chunks(to_remove_chunk_keys) + # make _remove_chunks release actor lock so that multiple `decref_chunks` can run concurrently. + yield self._remove_chunks(to_remove_chunk_keys) async def _remove_chunks(self, to_remove_chunk_keys: List[str]): if not to_remove_chunk_keys: @@ -117,8 +120,8 @@ async def _remove_chunks(self, to_remove_chunk_keys: List[str]): storage_api = await StorageAPI.create(self._session_id, band[0], band[1]) storage_api_to_deletes[storage_api].append( storage_api.delete.delay(key, error='ignore')) - for storage_api, deletes in storage_api_to_deletes.items(): - await storage_api.delete.batch(*deletes) + await asyncio.gather(*[storage_api.delete.batch(*deletes) + for storage_api, deletes in storage_api_to_deletes.items()]) # delete meta delete_metas = [] diff --git a/mars/services/task/supervisor/processor.py b/mars/services/task/supervisor/processor.py index 73e353ce84..e9e3503148 100644 --- a/mars/services/task/supervisor/processor.py +++ b/mars/services/task/supervisor/processor.py @@ -537,7 +537,10 @@ async def set_subtask_result(self, subtask_result: SubtaskResult): stage_processor.subtask_temp_result[subtask] = subtask_result if subtask_result.status.is_done: try: - await self._decref_input_subtasks(subtask, stage_processor.subtask_graph) + # Since every worker will call supervisor to set subtask result, + # we need to release actor lock to make `decref_chunks` parallel to avoid blocking + # other `set_subtask_result` calls. + yield self._decref_input_subtasks(subtask, stage_processor.subtask_graph) except: # noqa: E722 # nosec # pylint: disable=bare-except # pragma: no cover _, err, tb = sys.exc_info() if subtask_result.status not in (SubtaskStatus.errored, SubtaskStatus.cancelled): From 464e11203a2931d3325237db0095df1a3d7a2fcc Mon Sep 17 00:00:00 2001 From: chaokunyang Date: Fri, 13 Aug 2021 13:37:35 +0800 Subject: [PATCH 3/5] optimize graph assigner speed --- mars/services/task/analyzer/analyzer.py | 8 ++++++++ mars/services/task/analyzer/assigner.py | 11 +++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/mars/services/task/analyzer/analyzer.py b/mars/services/task/analyzer/analyzer.py index e1009b9c24..76817efc26 100644 --- a/mars/services/task/analyzer/analyzer.py +++ b/mars/services/task/analyzer/analyzer.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from collections import deque from typing import Dict, List, Tuple, Type, Union @@ -24,6 +26,8 @@ from .assigner import AbstractGraphAssigner, GraphAssigner from .fusion import Fusion +logger = logging.getLogger(__name__) + class GraphAnalyzer: def __init__(self, @@ -258,11 +262,15 @@ def gen_subtask_graph(self) -> SubtaskGraph: # assign expect workers cur_assigns = {op.key: self._to_band(op.expect_worker) for op in start_ops if op.expect_worker is not None} + logger.info('Start to assign %s start chunks.', len(start_ops)) chunk_to_bands = assigner.assign(cur_assigns=cur_assigns) + logger.info('Assigned %s start chunks.', len(start_ops)) # fuse node if self._fuse_enabled: + logger.info('Start to fuse chunks.') chunk_to_bands = self._fuse(chunk_to_bands) + logger.info('Fused chunks.') subtask_graph = SubtaskGraph() chunk_to_priorities = dict() diff --git a/mars/services/task/analyzer/assigner.py b/mars/services/task/analyzer/assigner.py index 53023511b1..a38519dbe4 100644 --- a/mars/services/task/analyzer/assigner.py +++ b/mars/services/task/analyzer/assigner.py @@ -114,7 +114,7 @@ def _calc_band_assign_limits(self, pos = (pos + 1) % len(counts) return dict(zip(bands, counts)) - def _assign_by_dfs(self, + def _assign_by_bfs(self, start: ChunkData, band: BandType, initial_sizes: Dict[BandType, int], @@ -122,7 +122,7 @@ def _assign_by_dfs(self, key_to_assign: Set[str], assigned_record: Dict[str, int]): """ - Assign initial nodes using breath-first search givin initial sizes and + Assign initial nodes using breath-first search given initial sizes and limitations of spread range. """ if initial_sizes[band] <= 0: @@ -130,9 +130,8 @@ def _assign_by_dfs(self, graph = self._chunk_graph if self._undirected_chunk_graph is None: - undirected_chunk_graph = graph.build_undirected() - else: - undirected_chunk_graph = self._undirected_chunk_graph + self._undirected_chunk_graph = graph.build_undirected() + undirected_chunk_graph = self._undirected_chunk_graph assigned = 0 spread_range = 0 @@ -187,7 +186,7 @@ def assign(self, cur_assigns: Dict[str, str] = None) -> Dict[ChunkData, BandType cur = sorted_candidates.pop() while cur.op.key in cur_assigns: cur = sorted_candidates.pop() - self._assign_by_dfs(cur, band, band_quotas, spread_ranges, + self._assign_by_bfs(cur, band, band_quotas, spread_ranges, op_keys, cur_assigns) key_to_assign = \ From bbc40adac387567b11a2e002e20565460c7309fd Mon Sep 17 00:00:00 2001 From: chaokunyang Date: Fri, 13 Aug 2021 14:26:07 +0800 Subject: [PATCH 4/5] release lock for stage_processor.set_subtask_result --- mars/services/task/supervisor/processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mars/services/task/supervisor/processor.py b/mars/services/task/supervisor/processor.py index 30f6dad76d..312c68556c 100644 --- a/mars/services/task/supervisor/processor.py +++ b/mars/services/task/supervisor/processor.py @@ -547,7 +547,7 @@ async def set_subtask_result(self, subtask_result: SubtaskResult): subtask_result.status = SubtaskStatus.errored subtask_result.error = err subtask_result.traceback = tb - await stage_processor.set_subtask_result(subtask_result) + yield stage_processor.set_subtask_result(subtask_result) def is_done(self) -> bool: for processor in self._task_id_to_processor.values(): From 0b0edd3f269159e489776dfd727dcd6471fb8573 Mon Sep 17 00:00:00 2001 From: chaokunyang Date: Fri, 13 Aug 2021 15:34:04 +0800 Subject: [PATCH 5/5] fix decref_chunks --- mars/services/lifecycle/supervisor/tracker.py | 2 +- mars/services/task/supervisor/processor.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mars/services/lifecycle/supervisor/tracker.py b/mars/services/lifecycle/supervisor/tracker.py index 391ca4526f..905eca026f 100644 --- a/mars/services/lifecycle/supervisor/tracker.py +++ b/mars/services/lifecycle/supervisor/tracker.py @@ -160,7 +160,7 @@ async def decref_tileables(self, tileable_keys: List[str]): self._tileable_ref_counts[tileable_key] -= 1 decref_chunk_keys.extend(self._tileable_key_to_chunk_keys[tileable_key]) - return await self.decref_chunks(decref_chunk_keys) + yield self.decref_chunks(decref_chunk_keys) def get_tileable_ref_counts(self, tileable_keys: List[str]) -> List[int]: return [self._tileable_ref_counts[tileable_key] diff --git a/mars/services/task/supervisor/processor.py b/mars/services/task/supervisor/processor.py index 312c68556c..30f6dad76d 100644 --- a/mars/services/task/supervisor/processor.py +++ b/mars/services/task/supervisor/processor.py @@ -547,7 +547,7 @@ async def set_subtask_result(self, subtask_result: SubtaskResult): subtask_result.status = SubtaskStatus.errored subtask_result.error = err subtask_result.traceback = tb - yield stage_processor.set_subtask_result(subtask_result) + await stage_processor.set_subtask_result(subtask_result) def is_done(self) -> bool: for processor in self._task_id_to_processor.values():