Skip to content

Commit

Permalink
Skip adding band when fetch shuffle data (mars-project#2922)
Browse files Browse the repository at this point in the history
  • Loading branch information
hekaisheng authored and wjsi committed May 5, 2022
1 parent 0918713 commit b0796c7
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
18 changes: 16 additions & 2 deletions mars/services/scheduling/worker/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,11 @@ async def _prepare_input_data(self, subtask: Subtask, band_name: str):
if shuffle_queries:
# TODO(hks): The batch method doesn't accept different error arguments,
# combine them when it can.
await storage_api.fetch.batch(*shuffle_queries)

# shuffle data fetch from remote won't be recorded in meta,
# thus they are not tracked by lifecycle service,
# here return remote mapper keys to remove them later.
return await storage_api.fetch.batch(*shuffle_queries)

async def _collect_input_sizes(
self, subtask: Subtask, supervisor_address: str, band_name: str
Expand Down Expand Up @@ -331,7 +335,7 @@ async def internal_run_subtask(self, subtask: Subtask, band_name: str):
subtask, subtask_info, self._prepare_input_data, subtask, band_name
)
)
await asyncio.wait_for(
remote_mapper_keys = await asyncio.wait_for(
prepare_data_task, timeout=self._data_prepare_timeout
)

Expand All @@ -348,6 +352,16 @@ async def internal_run_subtask(self, subtask: Subtask, band_name: str):
subtask_info.result = await self._retry_run_subtask(
subtask, band_name, subtask_api, batch_quota_req
)
if remote_mapper_keys:
storage_api = await StorageAPI.create(
subtask.session_id, address=self.address, band_name=band_name
)
await storage_api.delete.batch(
*[
storage_api.delete.delay(key, error="ignore")
for key in remote_mapper_keys
]
)
except: # noqa: E722 # pylint: disable=bare-except
_fill_subtask_result_with_exception(subtask, subtask_info)
finally:
Expand Down
7 changes: 6 additions & 1 deletion mars/services/storage/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,12 +539,17 @@ async def fetch_batch(

append_bands_delays = []
for data_key in fetch_keys:
# skip shuffle keys
if isinstance(data_key, tuple):
continue
append_bands_delays.append(
meta_api.add_chunk_bands.delay(
data_key, [(self.address, self._band_name)]
)
)
await meta_api.add_chunk_bands.batch(*append_bands_delays)
if append_bands_delays:
await meta_api.add_chunk_bands.batch(*append_bands_delays)
return fetch_keys

async def request_quota_with_spill(self, level: StorageLevel, size: int):
if await self._quota_refs[level].request_quota(size):
Expand Down

0 comments on commit b0796c7

Please sign in to comment.