Skip to content

Commit

Permalink
Limit number of _serialize requests when adding list of links with …
Browse files Browse the repository at this point in the history
…`add_external_files()` (#813)
  • Loading branch information
allegroai committed Nov 9, 2022
1 parent 8340d4b commit b793f2d
Show file tree
Hide file tree
Showing 4 changed files with 247 additions and 114 deletions.
224 changes: 140 additions & 84 deletions clearml/datasets/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class Dataset(object):
__default_dataset_version = "1.0.0"
__dataset_folder_template = CacheManager.set_context_folder_lookup(__cache_context, "{0}_archive_{1}")
__preview_max_file_entries = 15000
__preview_max_size = 5 * 1024 * 1024
__preview_max_size = 32 * 1024
__min_api_version = "2.20"
__hyperparams_section = "Datasets"
__datasets_runtime_prop = "datasets"
Expand Down Expand Up @@ -409,11 +409,13 @@ def add_external_files(
dataset_path=None, # type: Optional[str]
recursive=True, # type: bool
verbose=False, # type: bool
max_workers=None # type: Optional[int]
):
# type: (...) -> ()
# type: (...) -> int
"""
Adds an external file or a folder to the current dataset.
External file links can be from cloud storage (s3://, gs://, azure://) or local / network storage (file://).
Adds external files or folders to the current dataset.
External file links can be from cloud storage (s3://, gs://, azure://), local / network storage (file://)
or http(s)// files.
Calculates file size for each file and compares against parent.
A few examples:
Expand All @@ -436,92 +438,32 @@ def add_external_files(
'image.jpg' will be downloaded to 's3_files/image.jpg' (relative path to the dataset)
:param recursive: If True match all wildcard files recursively
:param verbose: If True print to console files added/modified
:return: number of file links added
:param max_workers: The number of threads to add the external files with. Useful when `source_url` is
a sequence. Defaults to the number of logical cores
:return: Number of file links added
"""
num_added = 0
self._dirty = True
if not isinstance(source_url, str):
for source_url_ in source_url:
num_added += self.add_external_files(
num_added = 0
num_modified = 0
source_url_list = source_url if not isinstance(source_url, str) else [source_url]
max_workers = max_workers or psutil.cpu_count()
futures_ = []
with ThreadPoolExecutor(max_workers=max_workers) as tp:
for source_url_ in source_url_list:
futures_.append(
tp.submit(
self._add_external_files,
source_url_,
wildcard=wildcard,
dataset_path=dataset_path,
recursive=recursive,
verbose=verbose
)
return num_added
if dataset_path:
dataset_path = dataset_path.lstrip("/")
# noinspection PyBroadException
try:
if StorageManager.exists_file(source_url):
links = [source_url]
else:
if source_url[-1] != "/":
source_url = source_url + "/"
links = StorageManager.list(source_url, return_full_path=True)
except Exception:
self._task.get_logger().report_text(
"Could not list/find remote file(s) when adding {}".format(source_url)
)
return 0
num_modified = 0
for link in links:
relative_path = link[len(source_url):]
if not relative_path:
relative_path = source_url.split("/")[-1]
if not matches_any_wildcard(relative_path, wildcard, recursive=recursive):
continue
try:
relative_path = Path(os.path.join(dataset_path or ".", relative_path)).as_posix()
size = StorageManager.get_file_size_bytes(link, silence_errors=True)
already_added_file = self._dataset_file_entries.get(relative_path)
if relative_path not in self._dataset_link_entries:
if verbose:
self._task.get_logger().report_text(
"External file {} added".format(link),
print_console=False,
)
self._dataset_link_entries[relative_path] = LinkEntry(
link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size
)
num_added += 1
elif already_added_file and already_added_file.size != size:
if verbose:
self._task.get_logger().report_text(
"External file {} modified".format(link),
print_console=False,
)
del self._dataset_file_entries[relative_path]
self._dataset_link_entries[relative_path] = LinkEntry(
link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size
)
num_modified += 1
elif (
relative_path in self._dataset_link_entries
and self._dataset_link_entries[relative_path].size != size
):
if verbose:
self._task.get_logger().report_text(
"External file {} modified".format(link),
print_console=False,
)
self._dataset_link_entries[relative_path] = LinkEntry(
link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size
)
num_modified += 1
else:
if verbose:
self._task.get_logger().report_text(
"External file {} skipped as it was not modified".format(link),
print_console=False,
)
except Exception as e:
if verbose:
self._task.get_logger().report_text(
"Error '{}' encountered trying to add external file {}".format(e, link),
print_console=False,
verbose=verbose,
)
)
for future_ in futures_:
num_added_this_call, num_modified_this_call = future_.result()
num_added += num_added_this_call
num_modified += num_modified_this_call
self._task.add_tags([self.__external_files_tag])
self._add_script_call(
"add_external_files",
Expand Down Expand Up @@ -712,11 +654,21 @@ def upload(
)
total_size += zip_.size
chunks_count += 1
truncated_preview = ""
add_truncated_message = False
truncated_message = "...\ntruncated (too many files to preview)"
for preview_entry in zip_.archive_preview[: Dataset.__preview_max_file_entries]:
truncated_preview += preview_entry + "\n"
if len(truncated_preview) > Dataset.__preview_max_size:
add_truncated_message = True
break
if len(zip_.archive_preview) > Dataset.__preview_max_file_entries:
add_truncated_message = True
pool.submit(
self._task.upload_artifact,
name=artifact_name,
artifact_object=Path(zip_path),
preview=zip_.archive_preview,
preview=truncated_preview + (truncated_message if add_truncated_message else ""),
delete_after_upload=True,
wait_on_upload=True,
)
Expand Down Expand Up @@ -2972,6 +2924,110 @@ def _get_dependency_chunk_lookup(self):
self._dependency_chunk_lookup = self._build_dependency_chunk_lookup()
return self._dependency_chunk_lookup

def _add_external_files(
self,
source_url, # type: str
wildcard=None, # type: Optional[Union[str, Sequence[str]]]
dataset_path=None, # type: Optional[str]
recursive=True, # type: bool
verbose=False, # type: bool
):
# type: (...) -> Tuple[int, int]
"""
Auxiliary function for `add_external_files`
Adds an external file or a folder to the current dataset.
External file links can be from cloud storage (s3://, gs://, azure://) or local / network storage (file://).
Calculates file size for each file and compares against parent.
:param source_url: Source url link (e.g. s3://bucket/folder/path)
:param wildcard: add only specific set of files.
Wildcard matching, can be a single string or a list of wildcards.
:param dataset_path: The location in the dataset where the file will be downloaded into.
e.g: for source_url='s3://bucket/remote_folder/image.jpg' and dataset_path='s3_files',
'image.jpg' will be downloaded to 's3_files/image.jpg' (relative path to the dataset)
:param recursive: If True match all wildcard files recursively
:param verbose: If True print to console files added/modified
:return: Number of file links added and modified
"""
if dataset_path:
dataset_path = dataset_path.lstrip("/")
remote_objects = None
# noinspection PyBroadException
try:
if StorageManager.exists_file(source_url):
remote_objects = [StorageManager.get_metadata(source_url)]
elif not source_url.startswith(("http://", "https://")):
if source_url[-1] != "/":
source_url = source_url + "/"
remote_objects = StorageManager.list(source_url, with_metadata=True, return_full_path=True)
except Exception:
pass
if not remote_objects:
self._task.get_logger().report_text(
"Could not list/find remote file(s) when adding {}".format(source_url)
)
return 0, 0
num_added = 0
num_modified = 0
for remote_object in remote_objects:
link = remote_object.get("name")
relative_path = link[len(source_url):]
if not relative_path:
relative_path = source_url.split("/")[-1]
if not matches_any_wildcard(relative_path, wildcard, recursive=recursive):
continue
try:
relative_path = Path(os.path.join(dataset_path or ".", relative_path)).as_posix()
size = remote_object.get("size")
already_added_file = self._dataset_file_entries.get(relative_path)
if relative_path not in self._dataset_link_entries:
if verbose:
self._task.get_logger().report_text(
"External file {} added".format(link),
print_console=False,
)
self._dataset_link_entries[relative_path] = LinkEntry(
link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size
)
num_added += 1
elif already_added_file and already_added_file.size != size:
if verbose:
self._task.get_logger().report_text(
"External file {} modified".format(link),
print_console=False,
)
del self._dataset_file_entries[relative_path]
self._dataset_link_entries[relative_path] = LinkEntry(
link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size
)
num_modified += 1
elif (
relative_path in self._dataset_link_entries
and self._dataset_link_entries[relative_path].size != size
):
if verbose:
self._task.get_logger().report_text(
"External file {} modified".format(link),
print_console=False,
)
self._dataset_link_entries[relative_path] = LinkEntry(
link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size
)
num_modified += 1
else:
if verbose:
self._task.get_logger().report_text(
"External file {} skipped as it was not modified".format(link),
print_console=False,
)
except Exception as e:
if verbose:
self._task.get_logger().report_text(
"Error '{}' encountered trying to add external file {}".format(e, link),
print_console=False,
)
return num_added, num_modified

def _build_chunk_selection(self, part, num_parts):
# type: (int, int) -> Dict[str, int]
"""
Expand Down
58 changes: 48 additions & 10 deletions clearml/storage/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,8 +595,20 @@ def get_object_size_bytes(self, remote_url, silence_errors=False):
:return: The size of the file in bytes.
None if the file could not be found or an error occurred.
"""
size = None
obj = self.get_object(remote_url, silence_errors=silence_errors)
return self._get_object_size_bytes(obj)

def _get_object_size_bytes(self, obj):
# type: (object, bool) -> [int, None]
"""
Auxiliary function for `get_object_size_bytes`.
Get size of the remote object in bytes.
:param object obj: The remote object
:return: The size of the object in bytes.
None if an error occurred.
"""
if not obj:
return None
try:
Expand All @@ -615,6 +627,21 @@ def get_object_size_bytes(self, remote_url, silence_errors=False):
pass
return size

def get_object_metadata(self, obj):
# type: (object) -> dict
"""
Get the metadata of the a remote object.
The metadata is a dict containing the following keys: `name`, `size`.
:param object obj: The remote object
:return: A dict containing the metadata of the remote object
"""
return {
"name": obj.name if hasattr(obj, "name") else obj.url if hasattr(obj, "url") else None,
"size": self._get_object_size_bytes(obj),
}

def verify_upload(self, folder_uri='', raise_on_error=True, log_on_error=True):
"""
Verify that this helper can upload files to a folder.
Expand Down Expand Up @@ -716,12 +743,13 @@ def callback(a_path):
res = quote_url(res)
return res

def list(self, prefix=None):
def list(self, prefix=None, with_metadata=False):
"""
List entries in the helper base path.
Return a list of names inside this helper base path. The base path is
determined at creation time and is specific for each storage medium.
Return a list of names inside this helper base path or a list of dictionaries containing
the objects' metadata. The base path is determined at creation time and is specific
for each storage medium.
For Google Storage and S3 it is the bucket of the path.
For local files it is the root directory.
Expand All @@ -731,11 +759,14 @@ def list(self, prefix=None):
must be a string - the path of a sub directory under the base path.
the returned list will include only objects under that subdir.
:return: The paths of all the objects in the storage base
path under prefix. Listed relative to the base path.
:param with_metadata: Instead of returning just the names of the objects, return a list of dictionaries
containing the name and metadata of the remote file. Thus, each dictionary will contain the following
keys: `name`, `size`.
:return: The paths of all the objects in the storage base path under prefix or
a list of dictionaries containing the objects' metadata.
Listed relative to the base path.
"""

if prefix:
if prefix.startswith(self._base_url):
prefix = prefix[len(self.base_url):].lstrip("/")
Expand All @@ -746,15 +777,22 @@ def list(self, prefix=None):
res = self._driver.list_container_objects(self._container)

result = [
obj.name
obj.name if not with_metadata else self.get_object_metadata(obj)
for obj in res
if (obj.name.startswith(prefix) or self._base_url == "file://") and obj.name != prefix
]
if self._base_url == "file://":
result = [Path(f).as_posix() for f in result]
if not with_metadata:
result = [Path(f).as_posix() for f in result]
else:
for metadata_entry in result:
metadata_entry["name"] = Path(metadata_entry["name"]).as_posix()
return result
else:
return [obj.name for obj in self._driver.list_container_objects(self._container)]
return [
obj.name if not with_metadata else self.get_object_metadata(obj)
for obj in self._driver.list_container_objects(self._container)
]

def download_to_file(
self,
Expand Down
Loading

0 comments on commit b793f2d

Please sign in to comment.