From d3c8a2bf5e83a27333f83c27d8d5e5a3055b9bfc Mon Sep 17 00:00:00 2001 From: Keith Wiley Date: Fri, 13 Dec 2024 14:30:07 -0800 Subject: [PATCH] generate_bulk_skeletons_async() imposes a 10k limit and breaks the bulk into batches of 100 to avoid URI-too-long errors. --- caveclient/skeletonservice.py | 54 +++++++++++++++++++++++++++-------- 1 file changed, 42 insertions(+), 12 deletions(-) diff --git a/caveclient/skeletonservice.py b/caveclient/skeletonservice.py index 013c68e1..77298bbd 100644 --- a/caveclient/skeletonservice.py +++ b/caveclient/skeletonservice.py @@ -23,6 +23,9 @@ SERVER_KEY = "skeleton_server_address" +MAX_BULK_ASYNCHRONOUS_SKELETONS = 10000 +BULK_ASYNC_SKELETONS_BATCH_SIZE = 100 + class NoL2CacheException(Exception): def __init__(self, value=""): @@ -673,20 +676,47 @@ def generate_bulk_skeletons_async( ) skeleton_version = -1 - url = self._build_bulk_async_endpoint( - root_ids, datastack_name, skeleton_version - ) - response = self.session.get(url) - self.raise_for_status(response, log_warning=log_warning) + if isinstance(root_ids, np.ndarray): + root_ids = root_ids.tolist() + if not isinstance(root_ids, list): + raise ValueError( + f"root_ids must be a list or numpy array of root_ids, not a {type(root_ids)}" + ) - estimated_async_time_secs_upper_bound = float(response.text) + if len(root_ids) > MAX_BULK_ASYNCHRONOUS_SKELETONS: + logging.warning( + f"The number of root_ids exceeds the current limit of {MAX_BULK_ASYNCHRONOUS_SKELETONS}. Only the first {MAX_BULK_ASYNCHRONOUS_SKELETONS} will be processed." + ) + root_ids = root_ids[:MAX_BULK_ASYNCHRONOUS_SKELETONS] - if verbose_level >= 1: - logging.info( - f"Queued asynchronous skeleton generation for root_ids: {root_ids}" + estimated_async_time_secs_upper_bound_sum = 0 + for batch in range(0, len(root_ids), BULK_ASYNC_SKELETONS_BATCH_SIZE): + rids_one_batch = root_ids[batch : batch + BULK_ASYNC_SKELETONS_BATCH_SIZE] + + url = self._build_bulk_async_endpoint( + rids_one_batch, datastack_name, skeleton_version ) - logging.info( - f"Upper estimate to generate {len(root_ids)} skeletons: {estimated_async_time_secs_upper_bound} seconds" + response = self.session.get(url) + self.raise_for_status(response, log_warning=log_warning) + + estimated_async_time_secs_upper_bound = float(response.text) + estimated_async_time_secs_upper_bound_sum += ( + estimated_async_time_secs_upper_bound ) - return f"Upper estimate to generate {len(root_ids)} skeletons: {estimated_async_time_secs_upper_bound} seconds" + if verbose_level >= 1: + logging.info( + f"Queued asynchronous skeleton generation for one batch of root_ids: {rids_one_batch}" + ) + logging.info( + f"Upper estimate to generate one batch of {len(rids_one_batch)} skeletons: {estimated_async_time_secs_upper_bound} seconds" + ) + + if estimated_async_time_secs_upper_bound_sum < 60: + return f"Upper estimate to generate all {len(root_ids)} skeletons: {estimated_async_time_secs_upper_bound_sum:.0f} seconds" + if estimated_async_time_secs_upper_bound_sum < 3600: + return f"Upper estimate to generate all {len(root_ids)} skeletons: {(estimated_async_time_secs_upper_bound_sum / 60):.1f} minutes" + # With a 10000 skeleton limit, the maximum time about 12 hours, so we don't need to check for more than that. + if True: # estimated_async_time_secs_upper_bound_sum < 86400: + return f"Upper estimate to generate all {len(root_ids)} skeletons: {(estimated_async_time_secs_upper_bound_sum / 3600):.1f} hours" + # return f"Upper estimate to generate all {len(root_ids)} skeletons: {(estimated_async_time_secs_upper_bound_sum / 86400):.2f} days"