Skip to content

Commit

Permalink
Use batched request to apply for slots
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed Dec 6, 2021
1 parent 4356fa1 commit dcc75b0
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 8 deletions.
1 change: 0 additions & 1 deletion mars/dataframe/contrib/raydataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def __getstate__(self):

# The default __setstate__ will update _MLDataset's __dict__;


else:
_Dataset = None

Expand Down
1 change: 0 additions & 1 deletion mars/dataframe/contrib/raydataset/mldataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ def __getstate__(self):

# The default __setstate__ will update _MLDataset's __dict__;


else:
_MLDataset = None

Expand Down
1 change: 1 addition & 0 deletions mars/services/scheduling/supervisor/globalslot.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ async def __pre_destroy__(self):
async def refresh_bands(self):
self._band_total_slots = await self._cluster_api.get_all_bands()

@mo.extensible
async def apply_subtask_slots(
self,
band: BandType,
Expand Down
37 changes: 32 additions & 5 deletions mars/services/scheduling/supervisor/queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None)
submit_aio_tasks = []
manager_ref = await self._get_manager_ref()

apply_delays = []
submit_items_list = []
submitted_bands = []

for band in bands:
band_limit = limit or self._band_slot_nums[band]
task_queue = self._band_queues[band]
Expand All @@ -181,17 +185,40 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None)
subtask_ids = list(submit_items)
if not subtask_ids:
continue

submitted_bands.append(band)
submit_items_list.append(submit_items)

# todo it is possible to provide slot data with more accuracy
subtask_slots = [1] * len(subtask_ids)

apply_delays.append(
self._slots_ref.apply_subtask_slots.delay(
band, self._session_id, subtask_ids, subtask_slots
)
)

async with redirect_subtask_errors(
self,
[
item.subtask
for submit_items in submit_items_list
for item in submit_items.values()
],
):
submitted_ids_list = await self._slots_ref.apply_subtask_slots.batch(
*apply_delays
)

for band, submit_items, submitted_ids in zip(
submitted_bands, submit_items_list, submitted_ids_list
):
subtask_ids = list(submit_items)
task_queue = self._band_queues[band]

async with redirect_subtask_errors(
self, [item.subtask for item in submit_items.values()]
):
submitted_ids = set(
await self._slots_ref.apply_subtask_slots(
band, self._session_id, subtask_ids, subtask_slots
)
)
non_submitted_ids = [k for k in submit_items if k not in submitted_ids]
if submitted_ids:
for stid in subtask_ids:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ async def create(cls, address: str, **kw):


class MockSlotsActor(mo.Actor):
@mo.extensible
def apply_subtask_slots(
self,
band: Tuple,
Expand Down
1 change: 1 addition & 0 deletions mars/services/scheduling/supervisor/tests/test_queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self):
def set_capacity(self, capacity: int):
self._capacity = capacity

@mo.extensible
def apply_subtask_slots(
self,
band: Tuple,
Expand Down
1 change: 0 additions & 1 deletion mars/storage/shared_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def __del__(self):
if os.name != "nt" and fd >= 0:
os.close(fd)


except ImportError: # pragma: no cover
# allow shared_memory package to be absent
SharedMemory = SharedMemoryForRead = None
Expand Down

0 comments on commit dcc75b0

Please sign in to comment.