Skip to content

Commit

Permalink
Fix issue in download that prevented CreateDirProcessor from running …
Browse files Browse the repository at this point in the history
…without a query string. Also added bdbag `strict` mode checking for bag creation in download (requires `bdbag>=1.7.3`) so that the creation of empty invalid bags throws an exception.

Added a pre-flight authentication check in deriva_upload.py that will cause an exception to be thrown before initiation of the upload sequence, if no user identity is found.
  • Loading branch information
mikedarcy committed May 14, 2024
1 parent 8555ba9 commit 23f254e
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 26 deletions.
6 changes: 4 additions & 2 deletions deriva/transfer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from deriva.transfer.download.deriva_download import DerivaDownload, GenericDownloader, DerivaDownloadError, \
DerivaDownloadConfigurationError, DerivaDownloadAuthenticationError, DerivaDownloadAuthorizationError
DerivaDownloadConfigurationError, DerivaDownloadAuthenticationError, DerivaDownloadAuthorizationError, \
DerivaDownloadBaggingError
from deriva.transfer.download.deriva_download_cli import DerivaDownloadCLI

from deriva.transfer.upload.deriva_upload import DerivaUpload, GenericUploader, DerivaUploadError, DerivaUploadError, \
DerivaUploadConfigurationError, DerivaUploadCatalogCreateError, DerivaUploadCatalogUpdateError
DerivaUploadConfigurationError, DerivaUploadCatalogCreateError, DerivaUploadCatalogUpdateError, \
DerivaUploadAuthenticationError
from deriva.transfer.upload.deriva_upload_cli import DerivaUploadCLI

from deriva.transfer.backup.deriva_backup import DerivaBackup, DerivaBackupAuthenticationError, \
Expand Down
4 changes: 4 additions & 0 deletions deriva/transfer/download/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ class DerivaDownloadAuthorizationError(Exception):

class DerivaDownloadTimeoutError(Exception):
pass


class DerivaDownloadBaggingError(Exception):
pass
46 changes: 33 additions & 13 deletions deriva/transfer/download/deriva_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@
import requests
from requests.exceptions import HTTPError
from bdbag import bdbag_api as bdb, bdbag_ro as ro, BAG_PROFILE_TAG, BDBAG_RO_PROFILE_ID
from bdbag.bdbagit import BagValidationError
from deriva.core import ErmrestCatalog, HatracStore, format_exception, get_credential, format_credential, read_config, \
stob, Megabyte, __version__ as VERSION
from deriva.core.utils.version_utils import get_installed_version
from deriva.transfer.download.processors import find_query_processor, find_transform_processor, find_post_processor
from deriva.transfer.download.processors.base_processor import LOCAL_PATH_KEY, REMOTE_PATHS_KEY, SERVICE_URL_KEY, \
FILE_SIZE_KEY
from deriva.transfer.download import DerivaDownloadError, DerivaDownloadConfigurationError, \
DerivaDownloadAuthenticationError, DerivaDownloadAuthorizationError, DerivaDownloadTimeoutError
DerivaDownloadAuthenticationError, DerivaDownloadAuthorizationError, DerivaDownloadTimeoutError, \
DerivaDownloadBaggingError


logger = logging.getLogger(__name__)


class DerivaDownload(object):
Expand Down Expand Up @@ -43,7 +48,7 @@ def __init__(self, server, **kwargs):
info = "%s v%s [Python %s, %s]" % (
self.__class__.__name__, get_installed_version(VERSION),
platform.python_version(), platform.platform(aliased=True))
logging.info("Initializing downloader: %s" % info)
logger.info("Initializing downloader: %s" % info)

if not self.server:
raise DerivaDownloadConfigurationError("Server not specified!")
Expand Down Expand Up @@ -145,12 +150,12 @@ def download(self, **kwargs):
try:
if not self.credentials:
self.set_credentials(get_credential(self.hostname))
logging.info("Validating credentials for host: %s" % self.hostname)
logger.info("Validating credentials for host: %s" % self.hostname)
attributes = self.catalog.get_authn_session().json()
identity = attributes["client"]
except HTTPError as he:
if he.response.status_code == 404:
logging.info("No existing login session found for host: %s" % self.hostname)
logger.info("No existing login session found for host: %s" % self.hostname)
except Exception as e:
raise DerivaDownloadAuthenticationError("Unable to validate credentials: %s" % format_exception(e))
wallet = kwargs.get("wallet", {})
Expand All @@ -160,6 +165,7 @@ def download(self, **kwargs):
bag_archiver = None
bag_algorithms = None
bag_idempotent = False
bag_strict = True
bag_config = self.config.get('bag')
create_bag = True if bag_config else False
if create_bag:
Expand All @@ -171,7 +177,8 @@ def download(self, **kwargs):
bag_idempotent = stob(bag_config.get('bag_idempotent', False))
bag_metadata = bag_config.get('bag_metadata', {"Internal-Sender-Identifier":
"deriva@%s" % self.server_url})
bag_ro = create_bag and not bag_idempotent and stob(bag_config.get('bag_ro', "True"))
bag_ro = create_bag and not bag_idempotent and stob(bag_config.get('bag_ro', True))
bag_strict = stob(bag_config.get('bag_strict', True))
if create_bag:
bdb.ensure_bag_path_exists(bag_path)
bag = bdb.make_bag(bag_path, algs=bag_algorithms, metadata=bag_metadata, idempotent=bag_idempotent)
Expand Down Expand Up @@ -211,12 +218,13 @@ def download(self, **kwargs):
allow_anonymous=self.allow_anonymous,
timeout=self.timeout)
outputs = processor.process()
assert outputs is not None
if processor.should_abort():
raise DerivaDownloadTimeoutError("Timeout (%s seconds) waiting for processor [%s] to complete." %
(self.timeout_secs, processor_name))
self.check_payload_size(outputs)
except Exception as e:
logging.error(format_exception(e))
logger.error(format_exception(e))
if create_bag:
bdb.cleanup_bag(bag_path)
if remote_file_manifest and os.path.isfile(remote_file_manifest):
Expand Down Expand Up @@ -270,16 +278,27 @@ def download(self, **kwargs):
remote_file_manifest=remote_file_manifest
if (remote_file_manifest and os.path.getsize(remote_file_manifest) > 0) else None,
update=True,
idempotent=bag_idempotent)
idempotent=bag_idempotent,
strict=bag_strict)
except BagValidationError as bve:
msg = "Unable to validate bag.%s Error: %s" % (
"" if not bag_strict else
" Strict checking has been enabled, which most likely means that this bag "
"is empty (has no payload files or fetch references) and therefore invalid.",
format_exception(bve))
logger.error(msg)
bdb.cleanup_bag(bag_path)
raise DerivaDownloadBaggingError(msg)
except Exception as e:
logging.fatal("Exception while updating bag manifests: %s" % format_exception(e))
msg = "Unhandled exception while updating bag manifests: %s" % format_exception(e)
logger.error(msg)
bdb.cleanup_bag(bag_path)
raise
raise DerivaDownloadBaggingError(msg)
finally:
if remote_file_manifest and os.path.isfile(remote_file_manifest):
os.remove(remote_file_manifest)

logging.info('Created bag: %s' % bag_path)
logger.info('Created bag: %s' % bag_path)

if bag_archiver is not None:
try:
Expand All @@ -289,8 +308,9 @@ def download(self, **kwargs):
bdb.cleanup_bag(bag_path)
outputs = {os.path.basename(archive): {LOCAL_PATH_KEY: archive}}
except Exception as e:
logging.error("Exception while creating data bag archive: %s" % format_exception(e))
raise
msg = "Exception while creating data bag archive: %s" % format_exception(e)
logger.error(msg)
raise DerivaDownloadBaggingError(msg)
else:
outputs = {os.path.basename(bag_path): {LOCAL_PATH_KEY: bag_path}}

Expand Down Expand Up @@ -318,7 +338,7 @@ def download(self, **kwargs):
(self.timeout_secs, processor_name))
self.check_payload_size(outputs)
except Exception as e:
logging.error(format_exception(e))
logger.error(format_exception(e))
raise

return outputs
Expand Down
5 changes: 3 additions & 2 deletions deriva/transfer/download/deriva_download_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from requests.exceptions import HTTPError, ConnectionError
from deriva.transfer import GenericDownloader
from deriva.transfer.download import DerivaDownloadError, DerivaDownloadConfigurationError, \
DerivaDownloadAuthenticationError, DerivaDownloadAuthorizationError, DerivaDownloadTimeoutError
DerivaDownloadAuthenticationError, DerivaDownloadAuthorizationError, DerivaDownloadTimeoutError, \
DerivaDownloadBaggingError
from deriva.core import BaseCLI, KeyValuePairArgs, format_credential, format_exception, urlparse


Expand Down Expand Up @@ -71,7 +72,7 @@ def main(self):
raise DerivaDownloadAuthorizationError(
"A requested operation was forbidden. Server responded: %s" % e)
except (DerivaDownloadError, DerivaDownloadConfigurationError, DerivaDownloadAuthenticationError,
DerivaDownloadAuthorizationError, DerivaDownloadTimeoutError) as e:
DerivaDownloadAuthorizationError, DerivaDownloadTimeoutError, DerivaDownloadBaggingError) as e:
sys.stderr.write(("\n" if not args.quiet else "") + format_exception(e))
if args.debug:
traceback.print_exc()
Expand Down
16 changes: 10 additions & 6 deletions deriva/transfer/download/processors/query/base_query_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ def __init__(self, envars=None, **kwargs):
self.catalog = kwargs["catalog"]
self.store = kwargs["store"]
self.base_path = kwargs["base_path"]
self.query = self.parameters["query_path"]
if self.envars:
self.query = self.parameters.get("query_path", "")
if self.query and self.envars:
self.query = self.query.format(**self.envars)
self.sub_path = self.parameters.get("output_path")
self.output_filename = self.parameters.get("output_filename")
Expand Down Expand Up @@ -59,6 +59,9 @@ def process(self):
return self.outputs

def catalogQuery(self, headers=None, as_file=True):
if not self.query:
return {}

if not headers:
headers = self.HEADERS.copy()
else:
Expand Down Expand Up @@ -175,8 +178,9 @@ def create_default_paths(self):
envars=self.envars)

def __del__(self):
for session in self.sessions.values():
session.close()
if self.sessions:
for session in self.sessions.values():
session.close()


class CSVQueryProcessor(BaseQueryProcessor):
Expand Down Expand Up @@ -228,8 +232,8 @@ def __init__(self, envars=None, **kwargs):
self.ext = ""

def process(self):
outputs = super(CreateDirProcessor, self).process()
super(CreateDirProcessor, self).process()
self.create_default_paths()
make_dirs(self.output_abspath)

return outputs
return self.outputs
4 changes: 4 additions & 0 deletions deriva/transfer/upload/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ class DerivaUploadCatalogCreateError (RuntimeError):

class DerivaUploadCatalogUpdateError (RuntimeError):
pass


class DerivaUploadAuthenticationError (RuntimeError):
pass
4 changes: 4 additions & 0 deletions deriva/transfer/upload/deriva_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,10 @@ def getAssetMapping(self, file_path):
return None, None, None

def uploadFiles(self, status_callback=None, file_callback=None):
if not self.identity:
raise DerivaUploadAuthenticationError("Unable to determine user identity for %s. "
"Please ensure that you are authenticated successfully." %
self.server_url)
completed = 0
for group, assets in self.file_list.items():
if self.cancelled:
Expand Down
4 changes: 2 additions & 2 deletions deriva/transfer/upload/deriva_upload_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import traceback
from deriva.transfer import DerivaUpload, DerivaUploadError, DerivaUploadConfigurationError, \
DerivaUploadCatalogCreateError, DerivaUploadCatalogUpdateError
DerivaUploadCatalogCreateError, DerivaUploadCatalogUpdateError, DerivaUploadAuthenticationError
from deriva.core import BaseCLI, write_config, format_credential, format_exception, urlparse


Expand Down Expand Up @@ -95,7 +95,7 @@ def main(self):
args.dry_run,
args.output_file)
except (RuntimeError, FileNotFoundError, DerivaUploadError, DerivaUploadConfigurationError,
DerivaUploadCatalogCreateError, DerivaUploadCatalogUpdateError) as e:
DerivaUploadCatalogCreateError, DerivaUploadCatalogUpdateError, DerivaUploadAuthenticationError) as e:
sys.stderr.write(("\n" if not args.quiet else "") + format_exception(e))
if args.debug:
traceback.print_exc()
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def get_readme_contents():
'pika',
'urllib3>=1.26,<3',
'portalocker>=1.2.1',
'bdbag>=1.7.2',
'bdbag>=1.7.3',
'globus_sdk>=3,<4',
'fair-research-login>=0.3.1',
'fair-identifiers-client>=0.5.1',
Expand Down

0 comments on commit 23f254e

Please sign in to comment.