From 23f254e9135edcb357f60ecb47fe6fca0cd4f296 Mon Sep 17 00:00:00 2001 From: mikedarcy Date: Tue, 14 May 2024 13:15:57 -0700 Subject: [PATCH] Fix issue in download that prevented CreateDirProcessor from running without a query string. Also added bdbag `strict` mode checking for bag creation in download (requires `bdbag>=1.7.3`) so that the creation of empty invalid bags throws an exception. Added a pre-flight authentication check in deriva_upload.py that will cause an exception to be thrown before initiation of the upload sequence, if no user identity is found. --- deriva/transfer/__init__.py | 6 ++- deriva/transfer/download/__init__.py | 4 ++ deriva/transfer/download/deriva_download.py | 46 +++++++++++++------ .../transfer/download/deriva_download_cli.py | 5 +- .../processors/query/base_query_processor.py | 16 ++++--- deriva/transfer/upload/__init__.py | 4 ++ deriva/transfer/upload/deriva_upload.py | 4 ++ deriva/transfer/upload/deriva_upload_cli.py | 4 +- setup.py | 2 +- 9 files changed, 65 insertions(+), 26 deletions(-) diff --git a/deriva/transfer/__init__.py b/deriva/transfer/__init__.py index 653b03d6..14eee584 100644 --- a/deriva/transfer/__init__.py +++ b/deriva/transfer/__init__.py @@ -1,9 +1,11 @@ from deriva.transfer.download.deriva_download import DerivaDownload, GenericDownloader, DerivaDownloadError, \ - DerivaDownloadConfigurationError, DerivaDownloadAuthenticationError, DerivaDownloadAuthorizationError + DerivaDownloadConfigurationError, DerivaDownloadAuthenticationError, DerivaDownloadAuthorizationError, \ + DerivaDownloadBaggingError from deriva.transfer.download.deriva_download_cli import DerivaDownloadCLI from deriva.transfer.upload.deriva_upload import DerivaUpload, GenericUploader, DerivaUploadError, DerivaUploadError, \ - DerivaUploadConfigurationError, DerivaUploadCatalogCreateError, DerivaUploadCatalogUpdateError + DerivaUploadConfigurationError, DerivaUploadCatalogCreateError, DerivaUploadCatalogUpdateError, \ + DerivaUploadAuthenticationError from deriva.transfer.upload.deriva_upload_cli import DerivaUploadCLI from deriva.transfer.backup.deriva_backup import DerivaBackup, DerivaBackupAuthenticationError, \ diff --git a/deriva/transfer/download/__init__.py b/deriva/transfer/download/__init__.py index 5b9004aa..af2fbb17 100644 --- a/deriva/transfer/download/__init__.py +++ b/deriva/transfer/download/__init__.py @@ -16,3 +16,7 @@ class DerivaDownloadAuthorizationError(Exception): class DerivaDownloadTimeoutError(Exception): pass + + +class DerivaDownloadBaggingError(Exception): + pass diff --git a/deriva/transfer/download/deriva_download.py b/deriva/transfer/download/deriva_download.py index d38242ab..d70c07ed 100644 --- a/deriva/transfer/download/deriva_download.py +++ b/deriva/transfer/download/deriva_download.py @@ -7,6 +7,7 @@ import requests from requests.exceptions import HTTPError from bdbag import bdbag_api as bdb, bdbag_ro as ro, BAG_PROFILE_TAG, BDBAG_RO_PROFILE_ID +from bdbag.bdbagit import BagValidationError from deriva.core import ErmrestCatalog, HatracStore, format_exception, get_credential, format_credential, read_config, \ stob, Megabyte, __version__ as VERSION from deriva.core.utils.version_utils import get_installed_version @@ -14,7 +15,11 @@ from deriva.transfer.download.processors.base_processor import LOCAL_PATH_KEY, REMOTE_PATHS_KEY, SERVICE_URL_KEY, \ FILE_SIZE_KEY from deriva.transfer.download import DerivaDownloadError, DerivaDownloadConfigurationError, \ - DerivaDownloadAuthenticationError, DerivaDownloadAuthorizationError, DerivaDownloadTimeoutError + DerivaDownloadAuthenticationError, DerivaDownloadAuthorizationError, DerivaDownloadTimeoutError, \ + DerivaDownloadBaggingError + + +logger = logging.getLogger(__name__) class DerivaDownload(object): @@ -43,7 +48,7 @@ def __init__(self, server, **kwargs): info = "%s v%s [Python %s, %s]" % ( self.__class__.__name__, get_installed_version(VERSION), platform.python_version(), platform.platform(aliased=True)) - logging.info("Initializing downloader: %s" % info) + logger.info("Initializing downloader: %s" % info) if not self.server: raise DerivaDownloadConfigurationError("Server not specified!") @@ -145,12 +150,12 @@ def download(self, **kwargs): try: if not self.credentials: self.set_credentials(get_credential(self.hostname)) - logging.info("Validating credentials for host: %s" % self.hostname) + logger.info("Validating credentials for host: %s" % self.hostname) attributes = self.catalog.get_authn_session().json() identity = attributes["client"] except HTTPError as he: if he.response.status_code == 404: - logging.info("No existing login session found for host: %s" % self.hostname) + logger.info("No existing login session found for host: %s" % self.hostname) except Exception as e: raise DerivaDownloadAuthenticationError("Unable to validate credentials: %s" % format_exception(e)) wallet = kwargs.get("wallet", {}) @@ -160,6 +165,7 @@ def download(self, **kwargs): bag_archiver = None bag_algorithms = None bag_idempotent = False + bag_strict = True bag_config = self.config.get('bag') create_bag = True if bag_config else False if create_bag: @@ -171,7 +177,8 @@ def download(self, **kwargs): bag_idempotent = stob(bag_config.get('bag_idempotent', False)) bag_metadata = bag_config.get('bag_metadata', {"Internal-Sender-Identifier": "deriva@%s" % self.server_url}) - bag_ro = create_bag and not bag_idempotent and stob(bag_config.get('bag_ro', "True")) + bag_ro = create_bag and not bag_idempotent and stob(bag_config.get('bag_ro', True)) + bag_strict = stob(bag_config.get('bag_strict', True)) if create_bag: bdb.ensure_bag_path_exists(bag_path) bag = bdb.make_bag(bag_path, algs=bag_algorithms, metadata=bag_metadata, idempotent=bag_idempotent) @@ -211,12 +218,13 @@ def download(self, **kwargs): allow_anonymous=self.allow_anonymous, timeout=self.timeout) outputs = processor.process() + assert outputs is not None if processor.should_abort(): raise DerivaDownloadTimeoutError("Timeout (%s seconds) waiting for processor [%s] to complete." % (self.timeout_secs, processor_name)) self.check_payload_size(outputs) except Exception as e: - logging.error(format_exception(e)) + logger.error(format_exception(e)) if create_bag: bdb.cleanup_bag(bag_path) if remote_file_manifest and os.path.isfile(remote_file_manifest): @@ -270,16 +278,27 @@ def download(self, **kwargs): remote_file_manifest=remote_file_manifest if (remote_file_manifest and os.path.getsize(remote_file_manifest) > 0) else None, update=True, - idempotent=bag_idempotent) + idempotent=bag_idempotent, + strict=bag_strict) + except BagValidationError as bve: + msg = "Unable to validate bag.%s Error: %s" % ( + "" if not bag_strict else + " Strict checking has been enabled, which most likely means that this bag " + "is empty (has no payload files or fetch references) and therefore invalid.", + format_exception(bve)) + logger.error(msg) + bdb.cleanup_bag(bag_path) + raise DerivaDownloadBaggingError(msg) except Exception as e: - logging.fatal("Exception while updating bag manifests: %s" % format_exception(e)) + msg = "Unhandled exception while updating bag manifests: %s" % format_exception(e) + logger.error(msg) bdb.cleanup_bag(bag_path) - raise + raise DerivaDownloadBaggingError(msg) finally: if remote_file_manifest and os.path.isfile(remote_file_manifest): os.remove(remote_file_manifest) - logging.info('Created bag: %s' % bag_path) + logger.info('Created bag: %s' % bag_path) if bag_archiver is not None: try: @@ -289,8 +308,9 @@ def download(self, **kwargs): bdb.cleanup_bag(bag_path) outputs = {os.path.basename(archive): {LOCAL_PATH_KEY: archive}} except Exception as e: - logging.error("Exception while creating data bag archive: %s" % format_exception(e)) - raise + msg = "Exception while creating data bag archive: %s" % format_exception(e) + logger.error(msg) + raise DerivaDownloadBaggingError(msg) else: outputs = {os.path.basename(bag_path): {LOCAL_PATH_KEY: bag_path}} @@ -318,7 +338,7 @@ def download(self, **kwargs): (self.timeout_secs, processor_name)) self.check_payload_size(outputs) except Exception as e: - logging.error(format_exception(e)) + logger.error(format_exception(e)) raise return outputs diff --git a/deriva/transfer/download/deriva_download_cli.py b/deriva/transfer/download/deriva_download_cli.py index ff05d382..9ba3b099 100644 --- a/deriva/transfer/download/deriva_download_cli.py +++ b/deriva/transfer/download/deriva_download_cli.py @@ -7,7 +7,8 @@ from requests.exceptions import HTTPError, ConnectionError from deriva.transfer import GenericDownloader from deriva.transfer.download import DerivaDownloadError, DerivaDownloadConfigurationError, \ - DerivaDownloadAuthenticationError, DerivaDownloadAuthorizationError, DerivaDownloadTimeoutError + DerivaDownloadAuthenticationError, DerivaDownloadAuthorizationError, DerivaDownloadTimeoutError, \ + DerivaDownloadBaggingError from deriva.core import BaseCLI, KeyValuePairArgs, format_credential, format_exception, urlparse @@ -71,7 +72,7 @@ def main(self): raise DerivaDownloadAuthorizationError( "A requested operation was forbidden. Server responded: %s" % e) except (DerivaDownloadError, DerivaDownloadConfigurationError, DerivaDownloadAuthenticationError, - DerivaDownloadAuthorizationError, DerivaDownloadTimeoutError) as e: + DerivaDownloadAuthorizationError, DerivaDownloadTimeoutError, DerivaDownloadBaggingError) as e: sys.stderr.write(("\n" if not args.quiet else "") + format_exception(e)) if args.debug: traceback.print_exc() diff --git a/deriva/transfer/download/processors/query/base_query_processor.py b/deriva/transfer/download/processors/query/base_query_processor.py index 1536682e..20ee74f0 100644 --- a/deriva/transfer/download/processors/query/base_query_processor.py +++ b/deriva/transfer/download/processors/query/base_query_processor.py @@ -21,8 +21,8 @@ def __init__(self, envars=None, **kwargs): self.catalog = kwargs["catalog"] self.store = kwargs["store"] self.base_path = kwargs["base_path"] - self.query = self.parameters["query_path"] - if self.envars: + self.query = self.parameters.get("query_path", "") + if self.query and self.envars: self.query = self.query.format(**self.envars) self.sub_path = self.parameters.get("output_path") self.output_filename = self.parameters.get("output_filename") @@ -59,6 +59,9 @@ def process(self): return self.outputs def catalogQuery(self, headers=None, as_file=True): + if not self.query: + return {} + if not headers: headers = self.HEADERS.copy() else: @@ -175,8 +178,9 @@ def create_default_paths(self): envars=self.envars) def __del__(self): - for session in self.sessions.values(): - session.close() + if self.sessions: + for session in self.sessions.values(): + session.close() class CSVQueryProcessor(BaseQueryProcessor): @@ -228,8 +232,8 @@ def __init__(self, envars=None, **kwargs): self.ext = "" def process(self): - outputs = super(CreateDirProcessor, self).process() + super(CreateDirProcessor, self).process() self.create_default_paths() make_dirs(self.output_abspath) - return outputs + return self.outputs diff --git a/deriva/transfer/upload/__init__.py b/deriva/transfer/upload/__init__.py index 6b383784..f8fdd780 100644 --- a/deriva/transfer/upload/__init__.py +++ b/deriva/transfer/upload/__init__.py @@ -12,3 +12,7 @@ class DerivaUploadCatalogCreateError (RuntimeError): class DerivaUploadCatalogUpdateError (RuntimeError): pass + + +class DerivaUploadAuthenticationError (RuntimeError): + pass diff --git a/deriva/transfer/upload/deriva_upload.py b/deriva/transfer/upload/deriva_upload.py index 84963e77..d94b197d 100644 --- a/deriva/transfer/upload/deriva_upload.py +++ b/deriva/transfer/upload/deriva_upload.py @@ -557,6 +557,10 @@ def getAssetMapping(self, file_path): return None, None, None def uploadFiles(self, status_callback=None, file_callback=None): + if not self.identity: + raise DerivaUploadAuthenticationError("Unable to determine user identity for %s. " + "Please ensure that you are authenticated successfully." % + self.server_url) completed = 0 for group, assets in self.file_list.items(): if self.cancelled: diff --git a/deriva/transfer/upload/deriva_upload_cli.py b/deriva/transfer/upload/deriva_upload_cli.py index 7a7c26b5..324dc499 100644 --- a/deriva/transfer/upload/deriva_upload_cli.py +++ b/deriva/transfer/upload/deriva_upload_cli.py @@ -3,7 +3,7 @@ import json import traceback from deriva.transfer import DerivaUpload, DerivaUploadError, DerivaUploadConfigurationError, \ - DerivaUploadCatalogCreateError, DerivaUploadCatalogUpdateError + DerivaUploadCatalogCreateError, DerivaUploadCatalogUpdateError, DerivaUploadAuthenticationError from deriva.core import BaseCLI, write_config, format_credential, format_exception, urlparse @@ -95,7 +95,7 @@ def main(self): args.dry_run, args.output_file) except (RuntimeError, FileNotFoundError, DerivaUploadError, DerivaUploadConfigurationError, - DerivaUploadCatalogCreateError, DerivaUploadCatalogUpdateError) as e: + DerivaUploadCatalogCreateError, DerivaUploadCatalogUpdateError, DerivaUploadAuthenticationError) as e: sys.stderr.write(("\n" if not args.quiet else "") + format_exception(e)) if args.debug: traceback.print_exc() diff --git a/setup.py b/setup.py index 37daba2f..08b09bbb 100644 --- a/setup.py +++ b/setup.py @@ -67,7 +67,7 @@ def get_readme_contents(): 'pika', 'urllib3>=1.26,<3', 'portalocker>=1.2.1', - 'bdbag>=1.7.2', + 'bdbag>=1.7.3', 'globus_sdk>=3,<4', 'fair-research-login>=0.3.1', 'fair-identifiers-client>=0.5.1',