diff --git a/mars/services/scheduling/supervisor/globalslot.py b/mars/services/scheduling/supervisor/globalslot.py index 07c6c2e5a6..06ab62a44c 100644 --- a/mars/services/scheduling/supervisor/globalslot.py +++ b/mars/services/scheduling/supervisor/globalslot.py @@ -62,6 +62,7 @@ async def __pre_destroy__(self): async def refresh_bands(self): self._band_total_slots = await self._cluster_api.get_all_bands() + @mo.extensible async def apply_subtask_slots( self, band: BandType, diff --git a/mars/services/scheduling/supervisor/queueing.py b/mars/services/scheduling/supervisor/queueing.py index 79cb9ff8a2..2b401bce71 100644 --- a/mars/services/scheduling/supervisor/queueing.py +++ b/mars/services/scheduling/supervisor/queueing.py @@ -169,6 +169,10 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None) submit_aio_tasks = [] manager_ref = await self._get_manager_ref() + apply_delays = [] + submit_items_list = [] + submitted_bands = [] + for band in bands: band_limit = limit or self._band_slot_nums[band] task_queue = self._band_queues[band] @@ -181,17 +185,40 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None) subtask_ids = list(submit_items) if not subtask_ids: continue + + submitted_bands.append(band) + submit_items_list.append(submit_items) + # todo it is possible to provide slot data with more accuracy subtask_slots = [1] * len(subtask_ids) + apply_delays.append( + self._slots_ref.apply_subtask_slots.delay( + band, self._session_id, subtask_ids, subtask_slots + ) + ) + + async with redirect_subtask_errors( + self, + [ + item.subtask + for submit_items in submit_items_list + for item in submit_items.values() + ], + ): + submitted_ids_list = await self._slots_ref.apply_subtask_slots.batch( + *apply_delays + ) + + for band, submit_items, submitted_ids in zip( + submitted_bands, submit_items_list, submitted_ids_list + ): + subtask_ids = list(submit_items) + task_queue = self._band_queues[band] + async with redirect_subtask_errors( self, [item.subtask for item in submit_items.values()] ): - submitted_ids = set( - await self._slots_ref.apply_subtask_slots( - band, self._session_id, subtask_ids, subtask_slots - ) - ) non_submitted_ids = [k for k in submit_items if k not in submitted_ids] if submitted_ids: for stid in subtask_ids: diff --git a/mars/services/scheduling/supervisor/tests/test_queue_balance.py b/mars/services/scheduling/supervisor/tests/test_queue_balance.py index b85e452ac8..821b95cf99 100644 --- a/mars/services/scheduling/supervisor/tests/test_queue_balance.py +++ b/mars/services/scheduling/supervisor/tests/test_queue_balance.py @@ -99,6 +99,7 @@ async def create(cls, address: str, **kw): class MockSlotsActor(mo.Actor): + @mo.extensible def apply_subtask_slots( self, band: Tuple, diff --git a/mars/services/scheduling/supervisor/tests/test_queueing.py b/mars/services/scheduling/supervisor/tests/test_queueing.py index a3b9060023..55904d7b63 100644 --- a/mars/services/scheduling/supervisor/tests/test_queueing.py +++ b/mars/services/scheduling/supervisor/tests/test_queueing.py @@ -33,6 +33,7 @@ def __init__(self): def set_capacity(self, capacity: int): self._capacity = capacity + @mo.extensible def apply_subtask_slots( self, band: Tuple,