diff --git a/clearml/datasets/dataset.py b/clearml/datasets/dataset.py index 614cff9d..098ded47 100644 --- a/clearml/datasets/dataset.py +++ b/clearml/datasets/dataset.py @@ -116,7 +116,7 @@ class Dataset(object): __default_dataset_version = "1.0.0" __dataset_folder_template = CacheManager.set_context_folder_lookup(__cache_context, "{0}_archive_{1}") __preview_max_file_entries = 15000 - __preview_max_size = 5 * 1024 * 1024 + __preview_max_size = 32 * 1024 __min_api_version = "2.20" __hyperparams_section = "Datasets" __datasets_runtime_prop = "datasets" @@ -409,11 +409,13 @@ def add_external_files( dataset_path=None, # type: Optional[str] recursive=True, # type: bool verbose=False, # type: bool + max_workers=None # type: Optional[int] ): - # type: (...) -> () + # type: (...) -> int """ - Adds an external file or a folder to the current dataset. - External file links can be from cloud storage (s3://, gs://, azure://) or local / network storage (file://). + Adds external files or folders to the current dataset. + External file links can be from cloud storage (s3://, gs://, azure://), local / network storage (file://) + or http(s)// files. Calculates file size for each file and compares against parent. A few examples: @@ -436,92 +438,32 @@ def add_external_files( 'image.jpg' will be downloaded to 's3_files/image.jpg' (relative path to the dataset) :param recursive: If True match all wildcard files recursively :param verbose: If True print to console files added/modified - :return: number of file links added + :param max_workers: The number of threads to add the external files with. Useful when `source_url` is + a sequence. Defaults to the number of logical cores + :return: Number of file links added """ - num_added = 0 self._dirty = True - if not isinstance(source_url, str): - for source_url_ in source_url: - num_added += self.add_external_files( + num_added = 0 + num_modified = 0 + source_url_list = source_url if not isinstance(source_url, str) else [source_url] + max_workers = max_workers or psutil.cpu_count() + futures_ = [] + with ThreadPoolExecutor(max_workers=max_workers) as tp: + for source_url_ in source_url_list: + futures_.append( + tp.submit( + self._add_external_files, source_url_, wildcard=wildcard, dataset_path=dataset_path, recursive=recursive, - verbose=verbose - ) - return num_added - if dataset_path: - dataset_path = dataset_path.lstrip("/") - # noinspection PyBroadException - try: - if StorageManager.exists_file(source_url): - links = [source_url] - else: - if source_url[-1] != "/": - source_url = source_url + "/" - links = StorageManager.list(source_url, return_full_path=True) - except Exception: - self._task.get_logger().report_text( - "Could not list/find remote file(s) when adding {}".format(source_url) - ) - return 0 - num_modified = 0 - for link in links: - relative_path = link[len(source_url):] - if not relative_path: - relative_path = source_url.split("/")[-1] - if not matches_any_wildcard(relative_path, wildcard, recursive=recursive): - continue - try: - relative_path = Path(os.path.join(dataset_path or ".", relative_path)).as_posix() - size = StorageManager.get_file_size_bytes(link, silence_errors=True) - already_added_file = self._dataset_file_entries.get(relative_path) - if relative_path not in self._dataset_link_entries: - if verbose: - self._task.get_logger().report_text( - "External file {} added".format(link), - print_console=False, - ) - self._dataset_link_entries[relative_path] = LinkEntry( - link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size - ) - num_added += 1 - elif already_added_file and already_added_file.size != size: - if verbose: - self._task.get_logger().report_text( - "External file {} modified".format(link), - print_console=False, - ) - del self._dataset_file_entries[relative_path] - self._dataset_link_entries[relative_path] = LinkEntry( - link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size - ) - num_modified += 1 - elif ( - relative_path in self._dataset_link_entries - and self._dataset_link_entries[relative_path].size != size - ): - if verbose: - self._task.get_logger().report_text( - "External file {} modified".format(link), - print_console=False, - ) - self._dataset_link_entries[relative_path] = LinkEntry( - link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size - ) - num_modified += 1 - else: - if verbose: - self._task.get_logger().report_text( - "External file {} skipped as it was not modified".format(link), - print_console=False, - ) - except Exception as e: - if verbose: - self._task.get_logger().report_text( - "Error '{}' encountered trying to add external file {}".format(e, link), - print_console=False, + verbose=verbose, ) + ) + for future_ in futures_: + num_added_this_call, num_modified_this_call = future_.result() + num_added += num_added_this_call + num_modified += num_modified_this_call self._task.add_tags([self.__external_files_tag]) self._add_script_call( "add_external_files", @@ -712,11 +654,21 @@ def upload( ) total_size += zip_.size chunks_count += 1 + truncated_preview = "" + add_truncated_message = False + truncated_message = "...\ntruncated (too many files to preview)" + for preview_entry in zip_.archive_preview[: Dataset.__preview_max_file_entries]: + truncated_preview += preview_entry + "\n" + if len(truncated_preview) > Dataset.__preview_max_size: + add_truncated_message = True + break + if len(zip_.archive_preview) > Dataset.__preview_max_file_entries: + add_truncated_message = True pool.submit( self._task.upload_artifact, name=artifact_name, artifact_object=Path(zip_path), - preview=zip_.archive_preview, + preview=truncated_preview + (truncated_message if add_truncated_message else ""), delete_after_upload=True, wait_on_upload=True, ) @@ -2972,6 +2924,110 @@ def _get_dependency_chunk_lookup(self): self._dependency_chunk_lookup = self._build_dependency_chunk_lookup() return self._dependency_chunk_lookup + def _add_external_files( + self, + source_url, # type: str + wildcard=None, # type: Optional[Union[str, Sequence[str]]] + dataset_path=None, # type: Optional[str] + recursive=True, # type: bool + verbose=False, # type: bool + ): + # type: (...) -> Tuple[int, int] + """ + Auxiliary function for `add_external_files` + Adds an external file or a folder to the current dataset. + External file links can be from cloud storage (s3://, gs://, azure://) or local / network storage (file://). + Calculates file size for each file and compares against parent. + + :param source_url: Source url link (e.g. s3://bucket/folder/path) + :param wildcard: add only specific set of files. + Wildcard matching, can be a single string or a list of wildcards. + :param dataset_path: The location in the dataset where the file will be downloaded into. + e.g: for source_url='s3://bucket/remote_folder/image.jpg' and dataset_path='s3_files', + 'image.jpg' will be downloaded to 's3_files/image.jpg' (relative path to the dataset) + :param recursive: If True match all wildcard files recursively + :param verbose: If True print to console files added/modified + :return: Number of file links added and modified + """ + if dataset_path: + dataset_path = dataset_path.lstrip("/") + remote_objects = None + # noinspection PyBroadException + try: + if StorageManager.exists_file(source_url): + remote_objects = [StorageManager.get_metadata(source_url)] + elif not source_url.startswith(("http://", "https://")): + if source_url[-1] != "/": + source_url = source_url + "/" + remote_objects = StorageManager.list(source_url, with_metadata=True, return_full_path=True) + except Exception: + pass + if not remote_objects: + self._task.get_logger().report_text( + "Could not list/find remote file(s) when adding {}".format(source_url) + ) + return 0, 0 + num_added = 0 + num_modified = 0 + for remote_object in remote_objects: + link = remote_object.get("name") + relative_path = link[len(source_url):] + if not relative_path: + relative_path = source_url.split("/")[-1] + if not matches_any_wildcard(relative_path, wildcard, recursive=recursive): + continue + try: + relative_path = Path(os.path.join(dataset_path or ".", relative_path)).as_posix() + size = remote_object.get("size") + already_added_file = self._dataset_file_entries.get(relative_path) + if relative_path not in self._dataset_link_entries: + if verbose: + self._task.get_logger().report_text( + "External file {} added".format(link), + print_console=False, + ) + self._dataset_link_entries[relative_path] = LinkEntry( + link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size + ) + num_added += 1 + elif already_added_file and already_added_file.size != size: + if verbose: + self._task.get_logger().report_text( + "External file {} modified".format(link), + print_console=False, + ) + del self._dataset_file_entries[relative_path] + self._dataset_link_entries[relative_path] = LinkEntry( + link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size + ) + num_modified += 1 + elif ( + relative_path in self._dataset_link_entries + and self._dataset_link_entries[relative_path].size != size + ): + if verbose: + self._task.get_logger().report_text( + "External file {} modified".format(link), + print_console=False, + ) + self._dataset_link_entries[relative_path] = LinkEntry( + link=link, relative_path=relative_path, parent_dataset_id=self._id, size=size + ) + num_modified += 1 + else: + if verbose: + self._task.get_logger().report_text( + "External file {} skipped as it was not modified".format(link), + print_console=False, + ) + except Exception as e: + if verbose: + self._task.get_logger().report_text( + "Error '{}' encountered trying to add external file {}".format(e, link), + print_console=False, + ) + return num_added, num_modified + def _build_chunk_selection(self, part, num_parts): # type: (int, int) -> Dict[str, int] """ diff --git a/clearml/storage/helper.py b/clearml/storage/helper.py index 897ce7a2..3c105bd4 100644 --- a/clearml/storage/helper.py +++ b/clearml/storage/helper.py @@ -595,8 +595,20 @@ def get_object_size_bytes(self, remote_url, silence_errors=False): :return: The size of the file in bytes. None if the file could not be found or an error occurred. """ - size = None obj = self.get_object(remote_url, silence_errors=silence_errors) + return self._get_object_size_bytes(obj) + + def _get_object_size_bytes(self, obj): + # type: (object, bool) -> [int, None] + """ + Auxiliary function for `get_object_size_bytes`. + Get size of the remote object in bytes. + + :param object obj: The remote object + + :return: The size of the object in bytes. + None if an error occurred. + """ if not obj: return None try: @@ -615,6 +627,21 @@ def get_object_size_bytes(self, remote_url, silence_errors=False): pass return size + def get_object_metadata(self, obj): + # type: (object) -> dict + """ + Get the metadata of the a remote object. + The metadata is a dict containing the following keys: `name`, `size`. + + :param object obj: The remote object + + :return: A dict containing the metadata of the remote object + """ + return { + "name": obj.name if hasattr(obj, "name") else obj.url if hasattr(obj, "url") else None, + "size": self._get_object_size_bytes(obj), + } + def verify_upload(self, folder_uri='', raise_on_error=True, log_on_error=True): """ Verify that this helper can upload files to a folder. @@ -716,12 +743,13 @@ def callback(a_path): res = quote_url(res) return res - def list(self, prefix=None): + def list(self, prefix=None, with_metadata=False): """ List entries in the helper base path. - Return a list of names inside this helper base path. The base path is - determined at creation time and is specific for each storage medium. + Return a list of names inside this helper base path or a list of dictionaries containing + the objects' metadata. The base path is determined at creation time and is specific + for each storage medium. For Google Storage and S3 it is the bucket of the path. For local files it is the root directory. @@ -731,11 +759,14 @@ def list(self, prefix=None): must be a string - the path of a sub directory under the base path. the returned list will include only objects under that subdir. - :return: The paths of all the objects in the storage base - path under prefix. Listed relative to the base path. + :param with_metadata: Instead of returning just the names of the objects, return a list of dictionaries + containing the name and metadata of the remote file. Thus, each dictionary will contain the following + keys: `name`, `size`. + :return: The paths of all the objects in the storage base path under prefix or + a list of dictionaries containing the objects' metadata. + Listed relative to the base path. """ - if prefix: if prefix.startswith(self._base_url): prefix = prefix[len(self.base_url):].lstrip("/") @@ -746,15 +777,22 @@ def list(self, prefix=None): res = self._driver.list_container_objects(self._container) result = [ - obj.name + obj.name if not with_metadata else self.get_object_metadata(obj) for obj in res if (obj.name.startswith(prefix) or self._base_url == "file://") and obj.name != prefix ] if self._base_url == "file://": - result = [Path(f).as_posix() for f in result] + if not with_metadata: + result = [Path(f).as_posix() for f in result] + else: + for metadata_entry in result: + metadata_entry["name"] = Path(metadata_entry["name"]).as_posix() return result else: - return [obj.name for obj in self._driver.list_container_objects(self._container)] + return [ + obj.name if not with_metadata else self.get_object_metadata(obj) + for obj in self._driver.list_container_objects(self._container) + ] def download_to_file( self, diff --git a/clearml/storage/manager.py b/clearml/storage/manager.py index 7f550143..39449b9b 100644 --- a/clearml/storage/manager.py +++ b/clearml/storage/manager.py @@ -314,15 +314,21 @@ def exists_file(cls, remote_url): :return: True is the remote_url stores a file and False otherwise """ - if remote_url.startswith("file://"): - return os.path.isfile(remote_url[len("file://"):]) - if remote_url.startswith("http://") or remote_url.startswith("https://"): - return requests.head(remote_url).status_code == requests.codes.ok - helper = StorageHelper.get(remote_url) - obj = helper.get_object(remote_url) - if not obj: + # noinspection PyBroadException + try: + if remote_url.endswith("/"): + return False + if remote_url.startswith("file://"): + return os.path.isfile(remote_url[len("file://"):]) + if remote_url.startswith(("http://", "https://")): + return requests.head(remote_url).ok + helper = StorageHelper.get(remote_url) + obj = helper.get_object(remote_url) + if not obj: + return False + return True + except Exception: return False - return len(StorageManager.list(remote_url)) == 0 @classmethod def get_file_size_bytes(cls, remote_url, silence_errors=False): @@ -419,10 +425,11 @@ def download_folder( return local_folder @classmethod - def list(cls, remote_url, return_full_path=False): - # type: (str, bool) -> Optional[List[str]] + def list(cls, remote_url, return_full_path=False, with_metadata=False): + # type: (str, bool) -> Optional[List[Union[str, dict]]] """ - Return a list of object names inside the base path + Return a list of object names inside the base path or dictionaries containing the corresponding + objects' metadata (in case `with_metadata` is True) :param str remote_url: The base path. For Google Storage, Azure and S3 it is the bucket of the path, for local files it is the root directory. @@ -431,17 +438,49 @@ def list(cls, remote_url, return_full_path=False): Azure blob storage: `azure://bucket/folder_` and also file system listing: `/mnt/share/folder_` :param bool return_full_path: If True, return a list of full object paths, otherwise return a list of relative object paths (default False). + :param with_metadata: Instead of returning just the names of the objects, return a list of dictionaries + containing the name and metadata of the remote file. Thus, each dictionary will contain the following + keys: `name`, `size`. + `return_full_path` will modify the name of each dictionary entry to the full path. - :return: The paths of all the objects in the storage base path under prefix, relative to the base path. + :return: The paths of all the objects the storage base path under prefix or the dictionaries containing the objects' metadata, relative to the base path. None in case of list operation is not supported (http and https protocols for example) """ helper = StorageHelper.get(remote_url) try: - names_list = helper.list(prefix=remote_url) + helper_list_result = helper.list(prefix=remote_url, with_metadata=with_metadata) except Exception as ex: LoggerRoot.get_base_logger().warning("Can not list files for '{}' - {}".format(remote_url, ex)) - names_list = None + return None + + prefix = remote_url.rstrip("/") if helper.base_url == "file://" else helper.base_url + if not with_metadata: + return ( + ["{}/{}".format(prefix, name) for name in helper_list_result] + if return_full_path + else helper_list_result + ) + else: + if return_full_path: + for obj in helper_list_result: + obj["name"] = "{}/{}".format(prefix, obj.get("name")) + return helper_list_result + + @classmethod + def get_metadata(cls, remote_url): + # type: (str) -> Optional[dict] + """ + Get the metadata of the a remote object. + The metadata is a dict containing the following keys: `name`, `size`. + + :param str remote_url: Source remote storage location, tree structure of `remote_url` will + be created under the target local_folder. Supports S3/GS/Azure, shared filesystem and http(s). + Example: 's3://bucket/data/' - if helper.base_url == 'file://': - return ["{}/{}".format(remote_url.rstrip('/'), name) for name in names_list] if return_full_path else names_list - return ["{}/{}".format(helper.base_url, name) for name in names_list] if return_full_path else names_list + :return: A dict containing the metadata of the remote object. In case of an error, `None` is returned + """ + helper = StorageHelper.get(remote_url) + obj = helper.get_object(remote_url) + if not obj: + return None + return helper.get_object_metadata(obj) diff --git a/clearml/utilities/parallel.py b/clearml/utilities/parallel.py index ecf79356..2b7c3db0 100644 --- a/clearml/utilities/parallel.py +++ b/clearml/utilities/parallel.py @@ -238,7 +238,7 @@ def __init__( self.fd, self.zip_path = mkstemp(prefix=zip_prefix, suffix=zip_suffix) self.zip_path = Path(self.zip_path) self.zip_file = ZipFile(self.zip_path.as_posix(), "w", allowZip64=allow_zip_64, compression=compression) - self.archive_preview = "" + self.archive_preview = [] self.count = 0 self.files_zipped = set() @@ -259,7 +259,7 @@ def zip(self, file_path, arcname=None): preview_path = arcname if not preview_path: preview_path = file_path - self.archive_preview += "{} - {}\n".format(preview_path, format_size(self.size)) + self.archive_preview.append("{} - {}".format(preview_path, format_size(self.size))) self.files_zipped.add(Path(file_path).as_posix()) if self._chunk_size <= 0 or self.size < self._chunk_size: self._zipper_queue.put(self) @@ -294,7 +294,7 @@ def merge(self, other): parent_zip.writestr(child_name, child_zip.open(child_name).read()) self.files_zipped |= other.files_zipped self.count += other.count - self.archive_preview += other.archive_preview + self.archive_preview.extend(other.archive_preview) def close(self): # type: () -> ()