diff --git a/mars/services/scheduling/worker/execution.py b/mars/services/scheduling/worker/execution.py index 4b8b422799..2164e43770 100644 --- a/mars/services/scheduling/worker/execution.py +++ b/mars/services/scheduling/worker/execution.py @@ -189,7 +189,11 @@ async def _prepare_input_data(self, subtask: Subtask, band_name: str): if shuffle_queries: # TODO(hks): The batch method doesn't accept different error arguments, # combine them when it can. - await storage_api.fetch.batch(*shuffle_queries) + + # shuffle data fetch from remote won't be recorded in meta, + # thus they are not tracked by lifecycle service, + # here return remote mapper keys to remove them later. + return await storage_api.fetch.batch(*shuffle_queries) async def _collect_input_sizes( self, subtask: Subtask, supervisor_address: str, band_name: str @@ -336,7 +340,7 @@ async def internal_run_subtask(self, subtask: Subtask, band_name: str): subtask, subtask_info, self._prepare_input_data, subtask, band_name ) ) - await asyncio.wait_for( + remote_mapper_keys = await asyncio.wait_for( prepare_data_task, timeout=self._data_prepare_timeout ) @@ -353,6 +357,16 @@ async def internal_run_subtask(self, subtask: Subtask, band_name: str): subtask_info.result = await self._retry_run_subtask( subtask, band_name, subtask_api, batch_quota_req ) + if remote_mapper_keys: + storage_api = await StorageAPI.create( + subtask.session_id, address=self.address, band_name=band_name + ) + await storage_api.delete.batch( + *[ + storage_api.delete.delay(key, error="ignore") + for key in remote_mapper_keys + ] + ) except: # noqa: E722 # pylint: disable=bare-except _fill_subtask_result_with_exception(subtask, subtask_info) finally: diff --git a/mars/services/storage/handler.py b/mars/services/storage/handler.py index 2d3dfe9a3d..3960cc374b 100644 --- a/mars/services/storage/handler.py +++ b/mars/services/storage/handler.py @@ -551,16 +551,18 @@ async def fetch_batch( append_bands_delays = [] for data_key in fetch_keys: - # meta service records main keys only, - # so we append band to the main key - main_key = data_key[0] if isinstance(data_key, tuple) else data_key + # skip shuffle keys + if isinstance(data_key, tuple): + continue append_bands_delays.append( meta_api.add_chunk_bands.delay( - main_key, + data_key, [(self.address, self._band_name)], ) ) - await meta_api.add_chunk_bands.batch(*append_bands_delays) + if append_bands_delays: + await meta_api.add_chunk_bands.batch(*append_bands_delays) + return fetch_keys async def request_quota_with_spill(self, level: StorageLevel, size: int): if await self._quota_refs[level].request_quota(size):