Skip to content

Commit

Permalink
Fix can't add S3 external links (#845)
Browse files Browse the repository at this point in the history
  • Loading branch information
allegroai committed Dec 13, 2022
1 parent 62a5ef1 commit 16c8a03
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 33 deletions.
2 changes: 1 addition & 1 deletion clearml/datasets/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3016,7 +3016,7 @@ def _add_external_files(
# noinspection PyBroadException
try:
if StorageManager.exists_file(source_url):
remote_objects = [StorageManager.get_metadata(source_url)]
remote_objects = [StorageManager.get_metadata(source_url, return_full_path=True)]
elif not source_url.startswith(("http://", "https://")):
if source_url[-1] != "/":
source_url = source_url + "/"
Expand Down
90 changes: 68 additions & 22 deletions clearml/storage/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def upload_object_via_stream(self, iterator, container, object_name, extra, **kw
pass

@abstractmethod
def list_container_objects(self, container, ex_prefix, **kwargs):
def list_container_objects(self, container, ex_prefix=None, **kwargs):
pass

@abstractmethod
Expand All @@ -100,6 +100,10 @@ def upload_object(self, file_path, container, object_name, extra, **kwargs):
def get_object(self, container_name, object_name, **kwargs):
pass

@abstractmethod
def exists_file(self, container_name, object_name):
pass

@classmethod
def get_file_server_hosts(cls):
if cls._file_server_hosts is None:
Expand Down Expand Up @@ -230,7 +234,6 @@ def get(cls, url, logger=None, **kwargs):
:return: A StorageHelper instance.
"""

# Handle URL substitution etc before locating the correct storage driver
url = cls._canonize_url(url)

Expand Down Expand Up @@ -374,7 +377,7 @@ def __init__(
# if this is not a known scheme assume local file
# url2pathname is specifically intended to operate on (urlparse result).path
# and returns a cross-platform compatible result
new_url = normalize_local_path(url)
new_url = normalize_local_path(url[len("file://"):] if url.startswith("file://") else url)
self._driver = _FileStorageDriver(new_url)
# noinspection PyBroadException
try:
Expand Down Expand Up @@ -638,10 +641,12 @@ def get_object_metadata(self, obj):
:return: A dict containing the metadata of the remote object
"""
return {
"name": obj.name if hasattr(obj, "name") else obj.url if hasattr(obj, "url") else None,
name_fields = ("name", "url", "key", "blob_name")
metadata = {
"size": self._get_object_size_bytes(obj),
"name": next(filter(None, (getattr(obj, f, None) for f in name_fields)), None),
}
return metadata

def verify_upload(self, folder_uri='', raise_on_error=True, log_on_error=True):
"""
Expand Down Expand Up @@ -770,18 +775,19 @@ def list(self, prefix=None, with_metadata=False):
"""
if prefix:
if prefix.startswith(self._base_url):
prefix = prefix[len(self.base_url):].lstrip("/")

try:
res = self._driver.list_container_objects(self._container, ex_prefix=prefix)
except TypeError:
res = self._driver.list_container_objects(self._container)

prefix = prefix[len(self._base_url):]
if self._base_url != "file://":
prefix = prefix.lstrip("/")
if self._base_url == "file://":
prefix = prefix.rstrip("/")
if prefix.startswith(str(self._driver.base_path)):
prefix = prefix[len(str(self._driver.base_path)):]
res = self._driver.list_container_objects(self._container, ex_prefix=prefix)
result = [
obj.name if not with_metadata else self.get_object_metadata(obj)
for obj in res
if (obj.name.startswith(prefix) or self._base_url == "file://") and obj.name != prefix
]

if self._base_url == "file://":
if not with_metadata:
result = [Path(f).as_posix() for f in result]
Expand Down Expand Up @@ -1223,6 +1229,12 @@ def close_async_threads():
except Exception:
pass

def exists_file(self, remote_url):
object_name = self._normalize_object_name(remote_url)
return self._driver.exists_file(
container_name=self._container.name if self._container else "", object_name=object_name
)


class _HttpDriver(_Driver):
""" LibCloud http/https adapter (simple, enough for now) """
Expand Down Expand Up @@ -1395,6 +1407,13 @@ def upload_object(self, file_path, container, object_name, extra, callback=None,
return self.upload_object_via_stream(iterator=stream, container=container,
object_name=object_name, extra=extra, callback=callback, **kwargs)

def exists_file(self, container_name, object_name):
# noinspection PyBroadException
try:
return requests.head(container_name + object_name, allow_redirects=True).ok
except Exception:
return False


class _Stream(object):
encoding = None
Expand Down Expand Up @@ -1812,6 +1831,15 @@ def get_direct_access(self, remote_path, **_):
def test_upload(self, test_path, config, **_):
return True

def exists_file(self, container_name, object_name):
obj = self.get_object(container_name, object_name)
# noinspection PyBroadException
try:
obj.load()
except Exception:
return False
return bool(obj)


class _GoogleCloudStorageDriver(_Driver):
"""Storage driver for google cloud storage"""
Expand Down Expand Up @@ -1886,8 +1914,13 @@ def upload_object(self, file_path, container, object_name, extra=None, **kwargs)
return False
return True

def list_container_objects(self, container, **kwargs):
return list(container.bucket.list_blobs())
def list_container_objects(self, container, ex_prefix=None, **kwargs):
# noinspection PyBroadException
try:
return list(container.bucket.list_blobs(prefix=ex_prefix))
except TypeError:
# google-cloud-storage < 1.17
return [blob for blob in container.bucket.list_blobs() if blob.name.startswith(ex_prefix)]

def delete_object(self, object, **kwargs):
try:
Expand Down Expand Up @@ -1957,6 +1990,9 @@ def test_upload(self, test_path, config, **_):
def get_direct_access(self, remote_path, **_):
return None

def exists_file(self, container_name, object_name):
return self.get_object(container_name, object_name).exists()


class _AzureBlobServiceStorageDriver(_Driver):
scheme = "azure"
Expand Down Expand Up @@ -2302,6 +2338,10 @@ def _blob_name_from_object_path(cls, name, container_name):
def get_direct_access(self, remote_path, **_):
return None

def exists_file(self, container_name, object_name):
container = self.get_container(container_name)
return container.exists(container_name, blob_name=object_name)


class _FileStorageDriver(_Driver):
"""
Expand Down Expand Up @@ -2364,8 +2404,6 @@ def _make_container(self, container_name):

try:
stat = os.stat(full_path)
if not os.path.isdir(full_path):
raise OSError("Target path \"{}\" is not a directory".format(full_path))
except OSError:
raise OSError("Target path \"{}\" is not accessible or does not exist".format(full_path))

Expand Down Expand Up @@ -2422,12 +2460,14 @@ def iterate_containers(self):
continue
yield self._make_container(container_name)

def _get_objects(self, container):
def _get_objects(self, container, prefix=None):
"""
Recursively iterate through the file-system and return the object names
"""

cpath = self.get_container_cdn_url(container, check=True)
if prefix:
cpath += "/" + prefix

for folder, subfolders, files in os.walk(cpath, topdown=True):
# Remove unwanted subfolders
Expand All @@ -2440,17 +2480,20 @@ def _get_objects(self, container):
object_name = os.path.relpath(full_path, start=cpath)
yield self._make_object(container, object_name)

def iterate_container_objects(self, container):
def iterate_container_objects(self, container, prefix=None):
"""
Returns a generator of objects for the given container.
:param container: Container instance
:type container: :class:`Container`
:param prefix: The path of a sub directory under the base container path.
The iterator will only include paths under that subdir.
:type prefix: Optional[str]
:return: A generator of Object instances.
"""

return self._get_objects(container)
return self._get_objects(container, prefix=prefix)

def get_container(self, container_name, **_):
"""
Expand Down Expand Up @@ -2750,8 +2793,8 @@ def delete_container(self, container):

return True

def list_container_objects(self, container, **kwargs):
return list(self.iterate_container_objects(container))
def list_container_objects(self, container, ex_prefix=None, **kwargs):
return list(self.iterate_container_objects(container, prefix=ex_prefix))

@staticmethod
def _read_in_chunks(iterator, chunk_size=None, fill_size=False, yield_empty=False):
Expand Down Expand Up @@ -2827,6 +2870,9 @@ def get_direct_access(self, remote_path, **_):
def test_upload(self, test_path, config, **kwargs):
return True

def exists_file(self, container_name, object_name):
return os.path.isfile(object_name)


def normalize_local_path(local_path):
"""
Expand Down
16 changes: 6 additions & 10 deletions clearml/storage/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,15 +320,8 @@ def exists_file(cls, remote_url):
try:
if remote_url.endswith("/"):
return False
if remote_url.startswith("file://"):
return os.path.isfile(remote_url[len("file://"):])
if remote_url.startswith(("http://", "https://")):
return requests.head(remote_url).ok
helper = StorageHelper.get(remote_url)
obj = helper.get_object(remote_url)
if not obj:
return False
return True
return helper.exists_file(remote_url)
except Exception:
return False

Expand Down Expand Up @@ -469,7 +462,7 @@ def list(cls, remote_url, return_full_path=False, with_metadata=False):
return helper_list_result

@classmethod
def get_metadata(cls, remote_url):
def get_metadata(cls, remote_url, return_full_path=False):
# type: (str) -> Optional[dict]
"""
Get the metadata of the a remote object.
Expand All @@ -485,4 +478,7 @@ def get_metadata(cls, remote_url):
obj = helper.get_object(remote_url)
if not obj:
return None
return helper.get_object_metadata(obj)
metadata = helper.get_object_metadata(obj)
if return_full_path and not metadata["name"].startswith(helper.base_url):
metadata["name"] = helper.base_url + ("/" if not helper.base_url.endswith("/") else "") + metadata["name"]
return metadata

0 comments on commit 16c8a03

Please sign in to comment.