Skip to content

Commit

Permalink
Skip adding band when fetch shuffle data (mars-project#2922)
Browse files Browse the repository at this point in the history
  • Loading branch information
hekaisheng authored and 不涸 committed Apr 22, 2022
1 parent 64f7fbf commit ae8906e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 7 deletions.
18 changes: 16 additions & 2 deletions mars/services/scheduling/worker/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,11 @@ async def _prepare_input_data(self, subtask: Subtask, band_name: str):
if shuffle_queries:
# TODO(hks): The batch method doesn't accept different error arguments,
# combine them when it can.
await storage_api.fetch.batch(*shuffle_queries)

# shuffle data fetch from remote won't be recorded in meta,
# thus they are not tracked by lifecycle service,
# here return remote mapper keys to remove them later.
return await storage_api.fetch.batch(*shuffle_queries)

async def _collect_input_sizes(
self, subtask: Subtask, supervisor_address: str, band_name: str
Expand Down Expand Up @@ -347,7 +351,7 @@ async def internal_run_subtask(self, subtask: Subtask, band_name: str):
subtask, subtask_info, self._prepare_input_data, subtask, band_name
)
)
await asyncio.wait_for(
remote_mapper_keys = await asyncio.wait_for(
prepare_data_task, timeout=self._data_prepare_timeout
)

Expand All @@ -364,6 +368,16 @@ async def internal_run_subtask(self, subtask: Subtask, band_name: str):
subtask_info.result = await self._retry_run_subtask(
subtask, band_name, subtask_api, batch_quota_req
)
if remote_mapper_keys:
storage_api = await StorageAPI.create(
subtask.session_id, address=self.address, band_name=band_name
)
await storage_api.delete.batch(
*[
storage_api.delete.delay(key, error="ignore")
for key in remote_mapper_keys
]
)
except: # noqa: E722 # pylint: disable=bare-except
_fill_subtask_result_with_exception(subtask, subtask_info)
finally:
Expand Down
12 changes: 7 additions & 5 deletions mars/services/storage/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,16 +571,18 @@ async def fetch_batch(

append_bands_delays = []
for data_key in fetch_keys:
# meta service records main keys only,
# so we append band to the main key
main_key = data_key[0] if isinstance(data_key, tuple) else data_key
# skip shuffle keys
if isinstance(data_key, tuple):
continue
append_bands_delays.append(
meta_api.add_chunk_bands.delay(
main_key,
data_key,
[(self.address, self._band_name)],
)
)
await async_call(meta_api.add_chunk_bands.batch(*append_bands_delays))
if append_bands_delays:
await meta_api.add_chunk_bands.batch(*append_bands_delays)
return fetch_keys

async def request_quota_with_spill(self, level: StorageLevel, size: int):
if await self._quota_refs[level].request_quota(size):
Expand Down

0 comments on commit ae8906e

Please sign in to comment.