Skip to content

Commit

Permalink
[BACKPORT] Use batched request to apply for slots (#2601) (#2615)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi authored Dec 13, 2021
1 parent a64b30d commit b06bde5
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 5 deletions.
1 change: 1 addition & 0 deletions mars/services/scheduling/supervisor/globalslot.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ async def __pre_destroy__(self):
async def refresh_bands(self):
self._band_total_slots = await self._cluster_api.get_all_bands()

@mo.extensible
async def apply_subtask_slots(
self,
band: BandType,
Expand Down
37 changes: 32 additions & 5 deletions mars/services/scheduling/supervisor/queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None)
submit_aio_tasks = []
manager_ref = await self._get_manager_ref()

apply_delays = []
submit_items_list = []
submitted_bands = []

for band in bands:
band_limit = limit or self._band_slot_nums[band]
task_queue = self._band_queues[band]
Expand All @@ -181,17 +185,40 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None)
subtask_ids = list(submit_items)
if not subtask_ids:
continue

submitted_bands.append(band)
submit_items_list.append(submit_items)

# todo it is possible to provide slot data with more accuracy
subtask_slots = [1] * len(subtask_ids)

apply_delays.append(
self._slots_ref.apply_subtask_slots.delay(
band, self._session_id, subtask_ids, subtask_slots
)
)

async with redirect_subtask_errors(
self,
[
item.subtask
for submit_items in submit_items_list
for item in submit_items.values()
],
):
submitted_ids_list = await self._slots_ref.apply_subtask_slots.batch(
*apply_delays
)

for band, submit_items, submitted_ids in zip(
submitted_bands, submit_items_list, submitted_ids_list
):
subtask_ids = list(submit_items)
task_queue = self._band_queues[band]

async with redirect_subtask_errors(
self, [item.subtask for item in submit_items.values()]
):
submitted_ids = set(
await self._slots_ref.apply_subtask_slots(
band, self._session_id, subtask_ids, subtask_slots
)
)
non_submitted_ids = [k for k in submit_items if k not in submitted_ids]
if submitted_ids:
for stid in subtask_ids:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ async def create(cls, address: str, **kw):


class MockSlotsActor(mo.Actor):
@mo.extensible
def apply_subtask_slots(
self,
band: Tuple,
Expand Down
1 change: 1 addition & 0 deletions mars/services/scheduling/supervisor/tests/test_queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self):
def set_capacity(self, capacity: int):
self._capacity = capacity

@mo.extensible
def apply_subtask_slots(
self,
band: Tuple,
Expand Down

0 comments on commit b06bde5

Please sign in to comment.