Skip to content

Commit

Permalink
Possible fix to dataset cache locks (allegroai#671)
Browse files Browse the repository at this point in the history
  • Loading branch information
mralgos committed Jun 3, 2022
1 parent fe791e6 commit 895183e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 22 deletions.
12 changes: 10 additions & 2 deletions clearml/datasets/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1692,7 +1692,12 @@ def _create_ds_target_folder(self, part=None, num_parts=None, lock_target_folder
if lock_target_folder:
cache.lock_cache_folder(local_folder)
local_folder.mkdir(parents=True, exist_ok=True)
return local_folder
return local_folder, cache

def _release_lock_ds_target_folder(self, target_folder):
# type: () -> None
cache = CacheManager.get_cache_manager(cache_context=self.__cache_context)
cache.unlock_cache_folder(target_folder)

def _get_data_artifact_names(self):
# type: () -> List[str]
Expand Down Expand Up @@ -1746,7 +1751,7 @@ def _merge_datasets(self, use_soft_links=None, raise_on_error=True, part=None, n
num_parts = self.get_num_chunks()

# just create the dataset target folder
target_base_folder = self._create_ds_target_folder(
target_base_folder, cache = self._create_ds_target_folder(
part=part, num_parts=num_parts, lock_target_folder=True)

# selected specific chunks if `part` was passed
Expand All @@ -1756,6 +1761,7 @@ def _merge_datasets(self, use_soft_links=None, raise_on_error=True, part=None, n
if target_base_folder and next(target_base_folder.iterdir(), None):
if self._verify_dataset_folder(target_base_folder, part, chunk_selection):
target_base_folder.touch()
self._release_lock_ds_target_folder(target_base_folder)
return target_base_folder.as_posix()
else:
LoggerRoot.get_base_logger().info('Dataset needs refreshing, fetching all parent datasets')
Expand Down Expand Up @@ -1784,6 +1790,7 @@ def _merge_datasets(self, use_soft_links=None, raise_on_error=True, part=None, n

# if we have no dependencies, we can just return now
if not dependencies_by_order:
self._release_lock_ds_target_folder(target_base_folder)
return target_base_folder.absolute().as_posix()

# extract parent datasets
Expand All @@ -1801,6 +1808,7 @@ def _merge_datasets(self, use_soft_links=None, raise_on_error=True, part=None, n
chunk_selection=chunk_selection, use_soft_links=use_soft_links,
raise_on_error=raise_on_error, force=True)

self._release_lock_ds_target_folder(target_base_folder)
return target_base_folder.absolute().as_posix()

def _get_dependencies_by_order(self, include_unused=False, include_current=True):
Expand Down
29 changes: 9 additions & 20 deletions clearml/storage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,26 +261,15 @@ def lock_cache_folder(self, local_path):
atexit.register(self._lock_file_cleanup_callback)

lock = self._folder_locks.get(local_path.as_posix())
i = 0
# try to create a lock if we do not already have one (if we do, we assume it is locked)
while not lock:
lock_path = local_path.parent / "{}{:03d}.{}{}".format(
CacheManager._lockfile_prefix,
i,
local_path.name,
CacheManager._lockfile_suffix,
)
lock = FileLock(filename=lock_path)

# try to lock folder (if we failed to create lock, try nex number)
try:
lock.acquire(timeout=0)
break
except LockException:
# failed locking, maybe someone else already locked it.
del lock
lock = None
i += 1
# try to create a lock. If it exists, wait for the lock to be released
lock_path = local_path.parent / "{}{:03d}.{}{}".format(
CacheManager._lockfile_prefix,
0,
local_path.name,
CacheManager._lockfile_suffix,
)
lock = FileLock(filename=lock_path)
lock.acquire(timeout=300, check_interval=1.0)

# store lock
self._folder_locks[local_path.as_posix()] = lock
Expand Down

0 comments on commit 895183e

Please sign in to comment.