Skip to content

Commit

Permalink
sync mode no longer subprocesses a daemon mode
Browse files Browse the repository at this point in the history
Reports progress while waiting for another package to finish copying

Signed-off-by: Ben Andersen <ben@isohedron.com.au>
  • Loading branch information
isohedronpipeline committed Mar 12, 2024
1 parent b5950d6 commit 461adb1
Showing 1 changed file with 103 additions and 14 deletions.
117 changes: 103 additions & 14 deletions src/rez/package_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ class PackageCache(object):
VARIANT_PENDING = 5 #: Variant is pending caching
VARIANT_REMOVED = 6 #: Variant was deleted

STATUS_DESCRIPTIONS = {
VARIANT_NOT_FOUND: "was not found",
VARIANT_FOUND: "was found",
VARIANT_CREATED: "was created",
VARIANT_COPYING: "payload is still being copied to this cache",
VARIANT_COPY_STALLED: "payload copy has stalled.\nSee "
"https://rez.readthedocs.io/en/stable/caching.html#cleaning-the-cache "
"for more information.",
VARIANT_PENDING: "is pending caching",
VARIANT_REMOVED: "was deleted",
}

_FILELOCK_TIMEOUT = 10
_COPYING_TIME_INC = 0.2
_COPYING_TIME_MAX = 5.0
Expand Down Expand Up @@ -116,7 +128,7 @@ def get_cached_root(self, variant):

return rootpath

def add_variant(self, variant, force=False):
def add_variant(self, variant, force=False, wait_for_copying=False, logger=None):
"""Copy a variant's payload into the cache.
The following steps are taken to ensure muti-thread/proc safety, and to
Expand Down Expand Up @@ -147,6 +159,9 @@ def add_variant(self, variant, force=False):
variant (Variant): The variant to copy into this cache
force (bool): Copy the variant regardless. Use at your own risk (there
is no guarantee the resulting variant payload will be functional).
wait_for_copying (bool): Whether the caching step should block when one of the
pending variants is marked as already copying.
logger (None | Logger): If a logger is provided, log information to it.
Returns:
tuple: 2-tuple:
Expand Down Expand Up @@ -214,17 +229,44 @@ def add_variant(self, variant, force=False):
% package.repository
)

no_op_statuses = (
no_op_statuses = {
self.VARIANT_FOUND,
self.VARIANT_COPYING,
self.VARIANT_COPY_STALLED
)
self.VARIANT_COPY_STALLED,
}
if not wait_for_copying:
# Copying variants are only no-ops if we want to ignore them.
no_op_statuses.add(self.VARIANT_COPYING)

# variant already exists, or is being copied to cache by another thread/proc
status, rootpath = self._get_cached_root(variant)
if status in no_op_statuses:
if logger:
logger.warning(f"Not caching {variant.name}-{variant.version}. "
f"Variant {self.STATUS_DESCRIPTIONS[status]}")
return (rootpath, status)

if wait_for_copying and status == self.VARIANT_COPYING:
ticks = 0
while status == self.VARIANT_COPYING:
self._print_with_spinner(
f"Waiting for {variant.name}-{variant.version} to finish copying.", ticks)
ticks += 1

time.sleep(self._COPYING_TIME_INC)
status, rootpath = self._get_cached_root(variant)
else:
# Status has changed, so report the change and return
if logger:
if status in no_op_statuses:
logger.warning(f"{variant.name}-{variant.version} "
f"{self.STATUS_DESCRIPTIONS[status]}")
elif status == self.VARIANT_FOUND:
# We have resolved into a satisfactory state
logger.info(f"{variant.name}-{variant.version} {self.STATUS_DESCRIPTIONS[status]}")
else:
logger.warning(f"{variant.name}-{variant.version} {self.STATUS_DESCRIPTIONS[status]}")
return (rootpath, status)

# 1.
path = self._get_hash_path(variant)
safe_makedirs(path)
Expand Down Expand Up @@ -396,14 +438,25 @@ def add_variants(self, variants, package_cache_async=True):
)

variants_ = []
cachable_statuses = {
self.VARIANT_NOT_FOUND,
}
if not package_cache_async:
# We want to monitor copying variants if we're synchronous.
# We also want to report that a status has been stalled, so we'll
# hand that off to the caching function as well
cachable_statuses.update({
self.VARIANT_COPYING,
self.VARIANT_COPY_STALLED,
})

# trim down to those variants that are cachable, and not already cached
for variant in variants:
if not variant.parent.is_cachable:
continue

status, _ = self._get_cached_root(variant)
if status == self.VARIANT_NOT_FOUND:
if status in cachable_statuses:
variants_.append(variant)

# if there are no variants to add, and no potential cleanup to do, then exit
Expand Down Expand Up @@ -444,6 +497,20 @@ def add_variants(self, variants, package_cache_async=True):
with open(filepath, 'w') as f:
f.write(json.dumps(handle_dict))

if package_cache_async:
self._subprocess_package_caching_daemon(self.path)
else:
# syncronous caching
self.run_caching_operation(wait_for_copying=True)

@staticmethod
def _subprocess_package_caching_daemon(path):
"""
Run the package cache in a daemon process
Returns:
subprocess.Popen : The package caching daemon process
"""
# configure executable
if platform.system() == "Windows":
kwargs = {
Expand All @@ -460,7 +527,7 @@ def add_variants(self, variants, package_cache_async=True):
raise RuntimeError("Did not find rez-pkg-cache executable")

# start caching subproc
args = [exe, "--daemon", self.path]
args = [exe, "--daemon", path]

try:
with open(os.devnull, 'w') as devnull:
Expand All @@ -471,14 +538,12 @@ def add_variants(self, variants, package_cache_async=True):
else:
out_target = devnull

process = subprocess.Popen(
return subprocess.Popen(
args,
stdout=out_target,
stderr=out_target,
**kwargs
)
if not package_cache_async:
process.communicate()

except Exception as e:
print_warning(
Expand Down Expand Up @@ -577,6 +642,15 @@ def run_daemon(self):
if pid > 0:
sys.exit(0)

self.run_caching_operation(wait_for_copying=False)

def run_caching_operation(self, wait_for_copying=False):
"""Copy pending variants.
Args:
wait_for_copying (bool): Whether the caching step should block when one of the
pending variants is marked as already copying.
"""
logger = self._init_logging()

# somewhere for the daemon to store stateful info
Expand All @@ -587,7 +661,7 @@ def run_daemon(self):
# copy variants into cache
try:
while True:
keep_running = self._run_daemon_step(state)
keep_running = self._run_caching_step(state, wait_for_copying=wait_for_copying)
if not keep_running:
break
except Exception:
Expand Down Expand Up @@ -701,12 +775,13 @@ def _lock(self):
except NotLocked:
pass

def _run_daemon_step(self, state):
def _run_caching_step(self, state, wait_for_copying=False):
logger = state["logger"]

# pick a random pending variant to copy
pending_filenames = set(os.listdir(self._pending_dir))
pending_filenames -= set(state.get("copying", set()))
if not wait_for_copying:
pending_filenames -= set(state.get("copying", set()))
if not pending_filenames:
return False

Expand All @@ -729,7 +804,11 @@ def _run_daemon_step(self, state):
t = time.time()

try:
rootpath, status = self.add_variant(variant)
rootpath, status = self.add_variant(
variant,
wait_for_copying=wait_for_copying,
logger=logger,
)

except PackageCacheError as e:
# variant cannot be cached, so remove as a pending variant
Expand Down Expand Up @@ -876,3 +955,13 @@ def _get_hash_path(self, variant):
dirs.append(hash_dirname)

return os.path.join(*dirs)

@staticmethod
def _print_with_spinner(message, ticks):
"""
Report a message with a spinner wheel to indicate progress.
"""
wheel = "⣾⣽⣻⢿⡿⣟⣯⣷"
ticks = ticks % len(wheel)
spinner = wheel[ticks:1 + ticks]
print(f" {spinner} {message}", end="\r")

0 comments on commit 461adb1

Please sign in to comment.