Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip adding band in meta when fetch shuffle data #2922

Merged
merged 1 commit into from
Apr 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions mars/services/scheduling/worker/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,11 @@ async def _prepare_input_data(self, subtask: Subtask, band_name: str):
if shuffle_queries:
# TODO(hks): The batch method doesn't accept different error arguments,
# combine them when it can.
await storage_api.fetch.batch(*shuffle_queries)

# shuffle data fetch from remote won't be recorded in meta,
# thus they are not tracked by lifecycle service,
# here return remote mapper keys to remove them later.
return await storage_api.fetch.batch(*shuffle_queries)

async def _collect_input_sizes(
self, subtask: Subtask, supervisor_address: str, band_name: str
Expand Down Expand Up @@ -336,7 +340,7 @@ async def internal_run_subtask(self, subtask: Subtask, band_name: str):
subtask, subtask_info, self._prepare_input_data, subtask, band_name
)
)
await asyncio.wait_for(
remote_mapper_keys = await asyncio.wait_for(
prepare_data_task, timeout=self._data_prepare_timeout
)

Expand All @@ -353,6 +357,16 @@ async def internal_run_subtask(self, subtask: Subtask, band_name: str):
subtask_info.result = await self._retry_run_subtask(
subtask, band_name, subtask_api, batch_quota_req
)
if remote_mapper_keys:
storage_api = await StorageAPI.create(
subtask.session_id, address=self.address, band_name=band_name
)
await storage_api.delete.batch(
*[
storage_api.delete.delay(key, error="ignore")
for key in remote_mapper_keys
]
)
except: # noqa: E722 # pylint: disable=bare-except
_fill_subtask_result_with_exception(subtask, subtask_info)
finally:
Expand Down
12 changes: 7 additions & 5 deletions mars/services/storage/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,16 +551,18 @@ async def fetch_batch(

append_bands_delays = []
for data_key in fetch_keys:
# meta service records main keys only,
# so we append band to the main key
main_key = data_key[0] if isinstance(data_key, tuple) else data_key
# skip shuffle keys
if isinstance(data_key, tuple):
continue
append_bands_delays.append(
meta_api.add_chunk_bands.delay(
main_key,
data_key,
[(self.address, self._band_name)],
)
)
await meta_api.add_chunk_bands.batch(*append_bands_delays)
if append_bands_delays:
await meta_api.add_chunk_bands.batch(*append_bands_delays)
return fetch_keys

async def request_quota_with_spill(self, level: StorageLevel, size: int):
if await self._quota_refs[level].request_quota(size):
Expand Down