From c4f9ca8268d42ecb985b916f2478691ebe162cfb Mon Sep 17 00:00:00 2001 From: afabiani Date: Sun, 10 Apr 2022 12:21:32 +0200 Subject: [PATCH] [Fixes #9064] Improve Upload Workflow resources state management --- geonode/upload/upload.py | 754 ++++++++++++++++++++------------------- geonode/upload/views.py | 137 ++++--- 2 files changed, 447 insertions(+), 444 deletions(-) diff --git a/geonode/upload/upload.py b/geonode/upload/upload.py index dc8dcd1ee66..8a6faa61054 100644 --- a/geonode/upload/upload.py +++ b/geonode/upload/upload.py @@ -53,9 +53,10 @@ from geonode import GeoNodeException from geonode.base import enumerations +from geonode.upload import LayerNotReady +from geonode.tasks.tasks import AcquireLock from geonode.layers.models import TIME_REGEX_FORMAT from geonode.resource.manager import resource_manager -from geonode.upload import LayerNotReady from geonode.upload.api.exceptions import GeneralUploadException from ..layers.models import Dataset @@ -277,143 +278,146 @@ def save_step(user, layer, spatial_files, overwrite=True, store_spatial_files=Tr time_presentation_default_value=None, time_presentation_reference_value=None, charset_encoding="UTF-8", target_store=None): - logger.debug( - f'Uploading layer: {layer}, files {spatial_files}') - if len(spatial_files) > 1: - # we only support more than one file if they're rasters for mosaicing - if not all( - [f.file_type.dataset_type == 'coverage' for f in spatial_files]): - msg = "Please upload only one type of file at a time" - logger.exception(Exception(msg)) - raise GeneralUploadException(detail=msg) - name = get_valid_dataset_name(layer, overwrite) - logger.debug(f'Name for layer: {name}') - if not any(spatial_files.all_files()): - msg = "Unable to recognize the uploaded file(s)" - logger.exception(Exception(msg)) - raise GeneralUploadException(detail=msg) - the_dataset_type = get_dataset_type(spatial_files) - _check_geoserver_store(name, the_dataset_type, overwrite) - if the_dataset_type not in ( - FeatureType.resource_type, - Coverage.resource_type): - msg = f"Expected layer type to FeatureType or Coverage, not {the_dataset_type}" - logger.exception(Exception(msg)) - raise GeneralUploadException(msg) - files_to_upload = preprocess_files(spatial_files) - logger.debug(f"files_to_upload: {files_to_upload}") - logger.debug(f'Uploading {the_dataset_type}') - error_msg = None - try: - next_id = _get_next_id() - # Truncate name to maximum length defined by the field. - max_length = Upload._meta.get_field('name').max_length - name = name[:max_length] - # save record of this whether valid or not - will help w/ debugging - upload, _ = Upload.objects.get_or_create( - user=user, - name=name, - state=enumerations.STATE_READY, - upload_dir=spatial_files.dirname - ) - upload.store_spatial_files = store_spatial_files - - # @todo settings for use_url or auto detection if geoserver is - # on same host - - # Is it a regular file or an ImageMosaic? - # if mosaic_time_regex and mosaic_time_value: - if mosaic: # we want to ingest as ImageMosaic - target_store, files_to_upload = utils.import_imagemosaic_granules( - spatial_files, - append_to_mosaic_opts, - append_to_mosaic_name, - mosaic_time_regex, - mosaic_time_value, - time_presentation, - time_presentation_res, - time_presentation_default_value, - time_presentation_reference_value) - upload.mosaic = mosaic - upload.append_to_mosaic_opts = append_to_mosaic_opts - upload.append_to_mosaic_name = append_to_mosaic_name - upload.mosaic_time_regex = mosaic_time_regex - upload.mosaic_time_value = mosaic_time_value - # moving forward with a regular Importer session - if len(files_to_upload) > 1: - import_session = gs_uploader.upload_files( - files_to_upload[1:], - use_url=False, - # import_id=next_id, - target_store=target_store, - charset_encoding=charset_encoding - ) - else: - import_session = gs_uploader.upload_files( - files_to_upload, - use_url=False, - # import_id=next_id, - target_store=target_store, - charset_encoding=charset_encoding + lock_id = 'upload-workflow-save_step' + with AcquireLock(lock_id, blocking=True) as lock: + if lock.acquire() is True: + logger.debug( + f'Uploading layer: {layer}, files {spatial_files}') + if len(spatial_files) > 1: + # we only support more than one file if they're rasters for mosaicing + if not all( + [f.file_type.dataset_type == 'coverage' for f in spatial_files]): + msg = "Please upload only one type of file at a time" + logger.exception(Exception(msg)) + raise GeneralUploadException(detail=msg) + name = get_valid_dataset_name(layer, overwrite) + logger.debug(f'Name for layer: {name}') + if not any(spatial_files.all_files()): + msg = "Unable to recognize the uploaded file(s)" + logger.exception(Exception(msg)) + raise GeneralUploadException(detail=msg) + the_dataset_type = get_dataset_type(spatial_files) + _check_geoserver_store(name, the_dataset_type, overwrite) + if the_dataset_type not in ( + FeatureType.resource_type, + Coverage.resource_type): + msg = f"Expected layer type to FeatureType or Coverage, not {the_dataset_type}" + logger.exception(Exception(msg)) + raise GeneralUploadException(msg) + files_to_upload = preprocess_files(spatial_files) + logger.debug(f"files_to_upload: {files_to_upload}") + logger.debug(f'Uploading {the_dataset_type}') + error_msg = None + try: + next_id = _get_next_id() + # Truncate name to maximum length defined by the field. + max_length = Upload._meta.get_field('name').max_length + name = name[:max_length] + # save record of this whether valid or not - will help w/ debugging + upload, _ = Upload.objects.get_or_create( + user=user, + name=name, + state=enumerations.STATE_READY, + upload_dir=spatial_files.dirname ) - next_id = import_session.id if import_session else None - if not next_id: - error_msg = 'No valid Importer Session could be found' - else: - # moving forward with a regular Importer session - import_session = gs_uploader.upload_files( - files_to_upload, - use_url=False, - import_id=next_id, - mosaic=False, - target_store=target_store, - name=name, - charset_encoding=charset_encoding - ) - upload.import_id = import_session.id - upload.save() - - # any unrecognized tasks/files must be deleted or we can't proceed - import_session.delete_unrecognized_tasks() - - if not mosaic: - if not import_session.tasks: - error_msg = 'No valid upload files could be found' - if import_session.tasks: - if import_session.tasks[0].state == 'NO_FORMAT' \ - or import_session.tasks[0].state == 'BAD_FORMAT': - error_msg = 'There may be a problem with the data provided - ' \ - 'we could not identify its format' - - if not mosaic and len(import_session.tasks) > 1: - error_msg = "Only a single upload is supported at the moment" - - if not error_msg and import_session.tasks: - task = import_session.tasks[0] - # single file tasks will have just a file entry - if hasattr(task, 'files'): - # @todo gsimporter - test this - if not all([hasattr(f, 'timestamp') - for f in task.source.files]): - error_msg = ( - "Not all timestamps could be recognized." - "Please ensure your files contain the correct formats.") - - if error_msg: - upload.set_processing_state(enumerations.STATE_INVALID) - - # @todo once the random tmp9723481758915 type of name is not - # around, need to track the name computed above, for now, the - # target store name can be used - except Exception as e: - logger.exception(e) - raise e + upload.store_spatial_files = store_spatial_files + + # @todo settings for use_url or auto detection if geoserver is + # on same host + + # Is it a regular file or an ImageMosaic? + # if mosaic_time_regex and mosaic_time_value: + if mosaic: # we want to ingest as ImageMosaic + target_store, files_to_upload = utils.import_imagemosaic_granules( + spatial_files, + append_to_mosaic_opts, + append_to_mosaic_name, + mosaic_time_regex, + mosaic_time_value, + time_presentation, + time_presentation_res, + time_presentation_default_value, + time_presentation_reference_value) + upload.mosaic = mosaic + upload.append_to_mosaic_opts = append_to_mosaic_opts + upload.append_to_mosaic_name = append_to_mosaic_name + upload.mosaic_time_regex = mosaic_time_regex + upload.mosaic_time_value = mosaic_time_value + # moving forward with a regular Importer session + if len(files_to_upload) > 1: + import_session = gs_uploader.upload_files( + files_to_upload[1:], + use_url=False, + # import_id=next_id, + target_store=target_store, + charset_encoding=charset_encoding + ) + else: + import_session = gs_uploader.upload_files( + files_to_upload, + use_url=False, + # import_id=next_id, + target_store=target_store, + charset_encoding=charset_encoding + ) + next_id = import_session.id if import_session else None + if not next_id: + error_msg = 'No valid Importer Session could be found' + else: + # moving forward with a regular Importer session + import_session = gs_uploader.upload_files( + files_to_upload, + use_url=False, + import_id=next_id, + mosaic=False, + target_store=target_store, + name=name, + charset_encoding=charset_encoding + ) + upload.import_id = import_session.id + upload.save() + + # any unrecognized tasks/files must be deleted or we can't proceed + import_session.delete_unrecognized_tasks() + + if not mosaic: + if not import_session.tasks: + error_msg = 'No valid upload files could be found' + if import_session.tasks: + if import_session.tasks[0].state == 'NO_FORMAT' \ + or import_session.tasks[0].state == 'BAD_FORMAT': + error_msg = 'There may be a problem with the data provided - ' \ + 'we could not identify its format' + + if not mosaic and len(import_session.tasks) > 1: + error_msg = "Only a single upload is supported at the moment" + + if not error_msg and import_session.tasks: + task = import_session.tasks[0] + # single file tasks will have just a file entry + if hasattr(task, 'files'): + # @todo gsimporter - test this + if not all([hasattr(f, 'timestamp') + for f in task.source.files]): + error_msg = ( + "Not all timestamps could be recognized." + "Please ensure your files contain the correct formats.") + + if error_msg: + upload.set_processing_state(enumerations.STATE_INVALID) + + # @todo once the random tmp9723481758915 type of name is not + # around, need to track the name computed above, for now, the + # target store name can be used + except Exception as e: + logger.exception(e) + raise e - if error_msg: - logger.exception(Exception(error_msg)) - raise GeneralUploadException(detail=error_msg) - else: - _log("Finished upload of [%s] to GeoServer without errors.", name) + if error_msg: + logger.exception(Exception(error_msg)) + raise GeneralUploadException(detail=error_msg) + else: + _log("Finished upload of [%s] to GeoServer without errors.", name) return import_session, upload @@ -570,256 +574,260 @@ def final_step(upload_session, user, charset="UTF-8", dataset_id=None): import_session = upload_session.import_session import_id = import_session.id - _log(f'Reloading session {import_id} to check validity') - try: - import_session = import_session.reload() - except gsimporter.api.NotFound as e: - logger.exception(e) - Upload.objects.invalidate_from_session(upload_session) - raise GeneralUploadException(detail=_("The GeoServer Import Session is no more available ") + e.args[0]) - - if Upload.objects.filter(import_id=import_id).count(): - Upload.objects.filter(import_id=import_id).update(complete=False) - upload = Upload.objects.filter(import_id=import_id).get() - if upload.state == enumerations.STATE_RUNNING: - return - - upload_session.import_session = import_session - Upload.objects.update_from_session(upload_session) + saved_dataset = None - # Create the style and assign it to the created resource - # FIXME: Put this in gsconfig.py - task = import_session.tasks[0] - task.set_charset(charset) + lock_id = f'{upload_session.name}-{import_session.id}' + with AcquireLock(lock_id, blocking=False) as lock: + if lock.acquire() is True: + _log(f'Reloading session {import_id} to check validity') + try: + import_session = import_session.reload() + except gsimporter.api.NotFound as e: + logger.exception(e) + Upload.objects.invalidate_from_session(upload_session) + raise GeneralUploadException(detail=_("The GeoServer Import Session is no more available ") + e.args[0]) - # @todo see above in save_step, regarding computed unique name - name = task.layer.name + if Upload.objects.filter(import_id=import_id).count(): + Upload.objects.filter(import_id=import_id).update(complete=False) + upload = Upload.objects.filter(import_id=import_id).get() + if upload.state == enumerations.STATE_RUNNING: + return - if dataset_id: - name = Dataset.objects.get(resourcebase_ptr_id=dataset_id).name + upload_session.import_session = import_session + Upload.objects.update_from_session(upload_session) - _log(f'Getting from catalog [{name}]') - try: - # the importer chooses an available featuretype name late in the game need - # to verify the resource.name otherwise things will fail. This happens - # when the same data is uploaded a second time and the default name is - # chosen - gs_catalog.get_layer(name) - except Exception: - Upload.objects.invalidate_from_session(upload_session) - raise LayerNotReady( - _(f"Expected to find layer named '{name}' in geoserver")) + # Create the style and assign it to the created resource + # FIXME: Put this in gsconfig.py + task = import_session.tasks[0] + task.set_charset(charset) - if import_session.state == 'READY' or (import_session.state == 'PENDING' and task.state == 'READY'): - import_session.commit() - elif import_session.state == 'INCOMPLETE' and task.state != 'ERROR': - Upload.objects.invalidate_from_session(upload_session) - raise Exception(f'unknown item state: {task.state}') - try: - import_session = import_session.reload() - except gsimporter.api.NotFound as e: - logger.exception(e) - Upload.objects.invalidate_from_session(upload_session) - raise GeneralUploadException(detail=_("The GeoServer Import Session is no more available ") + e.args[0]) - upload_session.import_session = import_session - Upload.objects.update_from_session(upload_session) + # @todo see above in save_step, regarding computed unique name + name = task.layer.name - _log(f'Creating Django record for [{name}]') - target = task.target - alternate = task.get_target_layer_name() - dataset_uuid = None - title = upload_session.dataset_title - abstract = upload_session.dataset_abstract + if dataset_id: + name = Dataset.objects.get(resourcebase_ptr_id=dataset_id).name - metadata_uploaded = False - xml_file = upload_session.base_file[0].xml_files - if xml_file: - try: - # get model properties from XML - # If it's contained within a zip, need to extract it - if upload_session.base_file.archive: - archive = upload_session.base_file.archive - zf = zipfile.ZipFile(archive, 'r', allowZip64=True) - zf.extract(xml_file[0], os.path.dirname(archive)) - # Assign the absolute path to this file - xml_file = f"{os.path.dirname(archive)}/{xml_file[0]}" - - # Sanity checks - if isinstance(xml_file, list): - if len(xml_file) > 0: - xml_file = xml_file[0] - else: - xml_file = None - elif not isinstance(xml_file, str): - xml_file = None - - if xml_file and os.path.exists(xml_file) and os.access(xml_file, os.R_OK): - dataset_uuid, vals, regions, keywords, custom = parse_metadata( - open(xml_file).read()) - metadata_uploaded = True - except Exception as e: - Upload.objects.invalidate_from_session(upload_session) - logger.error(e) - raise GeoNodeException( - _("Exception occurred while parsing the provided Metadata file."), e) - - # look for SLD - sld_file = upload_session.base_file[0].sld_files - sld_uploaded = False - if sld_file: - # If it's contained within a zip, need to extract it - if upload_session.base_file.archive: - archive = upload_session.base_file.archive - logger.debug(f'using uploaded sld file from {archive}') - zf = zipfile.ZipFile(archive, 'r', allowZip64=True) - zf.extract(sld_file[0], os.path.dirname(archive), path=upload_session.tempdir) - # Assign the absolute path to this file - sld_file[0] = f"{os.path.dirname(archive)}/{sld_file[0]}" - else: - _sld_file = f"{os.path.dirname(upload_session.tempdir)}/{os.path.basename(sld_file[0])}" - logger.debug(f"copying [{sld_file[0]}] to [{_sld_file}]") + _log(f'Getting from catalog [{name}]') try: - shutil.copyfile(sld_file[0], _sld_file) - sld_file = _sld_file - except (IsADirectoryError, shutil.SameFileError) as e: - logger.exception(e) - sld_file = sld_file[0] - except Exception as e: + # the importer chooses an available featuretype name late in the game need + # to verify the resource.name otherwise things will fail. This happens + # when the same data is uploaded a second time and the default name is + # chosen + gs_catalog.get_layer(name) + except Exception: + Upload.objects.invalidate_from_session(upload_session) + raise LayerNotReady( + _(f"Expected to find layer named '{name}' in geoserver")) + + if import_session.state == 'READY' or (import_session.state == 'PENDING' and task.state == 'READY'): + import_session.commit() + elif import_session.state == 'INCOMPLETE' and task.state != 'ERROR': + Upload.objects.invalidate_from_session(upload_session) + raise Exception(f'unknown item state: {task.state}') + try: + import_session = import_session.reload() + except gsimporter.api.NotFound as e: logger.exception(e) - raise GeneralUploadException(detail=_('Error uploading Dataset') + e.args[0]) - sld_uploaded = True - else: - # get_files will not find the sld if it doesn't match the base name - # so we've worked around that in the view - if provided, it will be here - if upload_session.import_sld_file: - logger.debug('using provided sld file from importer') - base_file = upload_session.base_file - sld_file = base_file[0].sld_files[0] - sld_uploaded = False - logger.debug(f'[sld_uploaded: {sld_uploaded}] sld_file: {sld_file}') - - # Make sure the layer does not exists already - if dataset_uuid and Dataset.objects.filter(uuid=dataset_uuid).count(): - Upload.objects.invalidate_from_session(upload_session) - logger.error("The UUID identifier from the XML Metadata is already in use in this system.") - raise GeoNodeException( - _("The UUID identifier from the XML Metadata is already in use in this system.")) - - # Is it a regular file or an ImageMosaic? - saved_dataset = None - is_mosaic = False - has_time = has_elevation = False - start = end = None - if upload_session.mosaic_time_regex and upload_session.mosaic_time_value: - has_time = True - is_mosaic = True - start = datetime.datetime.strptime(upload_session.mosaic_time_value, - TIME_REGEX_FORMAT[upload_session.mosaic_time_regex]) - start = pytz.utc.localize(start, is_dst=False) - end = start - if upload_session.time and upload_session.time_info and upload_session.time_transforms: - has_time = True - - if upload_session.append_to_mosaic_opts: - # Is it a mosaic or a granule that must be added to an Image Mosaic? - saved_dataset_filter = Dataset.objects.filter( - name=upload_session.append_to_mosaic_name) - if not saved_dataset_filter.exists(): - saved_dataset = resource_manager.create( - name=upload_session.append_to_mosaic_name, - defaults=dict( - dirty_state=True, - state=enumerations.STATE_READY) - ) - created = True - else: - saved_dataset = saved_dataset_filter.get() - created = False - saved_dataset.set_dirty_state() - if saved_dataset.temporal_extent_start and end: - if pytz.utc.localize( - saved_dataset.temporal_extent_start, - is_dst=False) < end: - saved_dataset.temporal_extent_end = end - Dataset.objects.filter( - name=upload_session.append_to_mosaic_name).update( - temporal_extent_end=end) + Upload.objects.invalidate_from_session(upload_session) + raise GeneralUploadException(detail=_("The GeoServer Import Session is no more available ") + e.args[0]) + upload_session.import_session = import_session + Upload.objects.update_from_session(upload_session) + + _log(f'Creating Django record for [{name}]') + target = task.target + alternate = task.get_target_layer_name() + dataset_uuid = None + title = upload_session.dataset_title + abstract = upload_session.dataset_abstract + + metadata_uploaded = False + xml_file = upload_session.base_file[0].xml_files + if xml_file: + try: + # get model properties from XML + # If it's contained within a zip, need to extract it + if upload_session.base_file.archive: + archive = upload_session.base_file.archive + zf = zipfile.ZipFile(archive, 'r', allowZip64=True) + zf.extract(xml_file[0], os.path.dirname(archive)) + # Assign the absolute path to this file + xml_file = f"{os.path.dirname(archive)}/{xml_file[0]}" + + # Sanity checks + if isinstance(xml_file, list): + if len(xml_file) > 0: + xml_file = xml_file[0] + else: + xml_file = None + elif not isinstance(xml_file, str): + xml_file = None + + if xml_file and os.path.exists(xml_file) and os.access(xml_file, os.R_OK): + dataset_uuid, vals, regions, keywords, custom = parse_metadata( + open(xml_file).read()) + metadata_uploaded = True + except Exception as e: + Upload.objects.invalidate_from_session(upload_session) + logger.error(e) + raise GeoNodeException( + _("Exception occurred while parsing the provided Metadata file."), e) + + # look for SLD + sld_file = upload_session.base_file[0].sld_files + sld_uploaded = False + if sld_file: + # If it's contained within a zip, need to extract it + if upload_session.base_file.archive: + archive = upload_session.base_file.archive + logger.debug(f'using uploaded sld file from {archive}') + zf = zipfile.ZipFile(archive, 'r', allowZip64=True) + zf.extract(sld_file[0], os.path.dirname(archive), path=upload_session.tempdir) + # Assign the absolute path to this file + sld_file[0] = f"{os.path.dirname(archive)}/{sld_file[0]}" + else: + _sld_file = f"{os.path.dirname(upload_session.tempdir)}/{os.path.basename(sld_file[0])}" + logger.debug(f"copying [{sld_file[0]}] to [{_sld_file}]") + try: + shutil.copyfile(sld_file[0], _sld_file) + sld_file = _sld_file + except (IsADirectoryError, shutil.SameFileError) as e: + logger.exception(e) + sld_file = sld_file[0] + except Exception as e: + logger.exception(e) + raise GeneralUploadException(detail=_('Error uploading Dataset') + e.args[0]) + sld_uploaded = True else: - saved_dataset.temporal_extent_start = end - Dataset.objects.filter( - name=upload_session.append_to_mosaic_name).update( - temporal_extent_start=end) - else: - # The dataset is a standard one, no mosaic options enabled... - saved_dataset_filter = Dataset.objects.filter( - store=target.name, - alternate=alternate, - workspace=target.workspace_name, - name=task.layer.name) - if not saved_dataset_filter.exists(): - saved_dataset = resource_manager.create( - dataset_uuid, - resource_type=Dataset, - defaults=dict( + # get_files will not find the sld if it doesn't match the base name + # so we've worked around that in the view - if provided, it will be here + if upload_session.import_sld_file: + logger.debug('using provided sld file from importer') + base_file = upload_session.base_file + sld_file = base_file[0].sld_files[0] + sld_uploaded = False + logger.debug(f'[sld_uploaded: {sld_uploaded}] sld_file: {sld_file}') + + # Make sure the layer does not exists already + if dataset_uuid and Dataset.objects.filter(uuid=dataset_uuid).count(): + Upload.objects.invalidate_from_session(upload_session) + logger.error("The UUID identifier from the XML Metadata is already in use in this system.") + raise GeoNodeException( + _("The UUID identifier from the XML Metadata is already in use in this system.")) + + # Is it a regular file or an ImageMosaic? + is_mosaic = False + has_time = has_elevation = False + start = end = None + if upload_session.mosaic_time_regex and upload_session.mosaic_time_value: + has_time = True + is_mosaic = True + start = datetime.datetime.strptime(upload_session.mosaic_time_value, + TIME_REGEX_FORMAT[upload_session.mosaic_time_regex]) + start = pytz.utc.localize(start, is_dst=False) + end = start + if upload_session.time and upload_session.time_info and upload_session.time_transforms: + has_time = True + + if upload_session.append_to_mosaic_opts: + # Is it a mosaic or a granule that must be added to an Image Mosaic? + saved_dataset_filter = Dataset.objects.filter( + name=upload_session.append_to_mosaic_name) + if not saved_dataset_filter.exists(): + saved_dataset = resource_manager.create( + name=upload_session.append_to_mosaic_name, + defaults=dict( + dirty_state=True, + state=enumerations.STATE_READY) + ) + created = True + else: + saved_dataset = saved_dataset_filter.get() + created = False + saved_dataset.set_dirty_state() + if saved_dataset.temporal_extent_start and end: + if pytz.utc.localize( + saved_dataset.temporal_extent_start, + is_dst=False) < end: + saved_dataset.temporal_extent_end = end + Dataset.objects.filter( + name=upload_session.append_to_mosaic_name).update( + temporal_extent_end=end) + else: + saved_dataset.temporal_extent_start = end + Dataset.objects.filter( + name=upload_session.append_to_mosaic_name).update( + temporal_extent_start=end) + else: + # The dataset is a standard one, no mosaic options enabled... + saved_dataset_filter = Dataset.objects.filter( store=target.name, - subtype=get_dataset_storetype(target.store_type), alternate=alternate, workspace=target.workspace_name, - title=title, - name=task.layer.name, - abstract=abstract or _('No abstract provided'), - owner=user, - dirty_state=True, - state=enumerations.STATE_READY, - temporal_extent_start=start, - temporal_extent_end=end, - is_mosaic=is_mosaic, - has_time=has_time, - has_elevation=has_elevation, - time_regex=upload_session.mosaic_time_regex)) - created = True - else: - saved_dataset = saved_dataset_filter.get() - created = False - - assert saved_dataset - - if not created: - return saved_dataset + name=task.layer.name) + if not saved_dataset_filter.exists(): + saved_dataset = resource_manager.create( + dataset_uuid, + resource_type=Dataset, + defaults=dict( + store=target.name, + subtype=get_dataset_storetype(target.store_type), + alternate=alternate, + workspace=target.workspace_name, + title=title, + name=task.layer.name, + abstract=abstract or _('No abstract provided'), + owner=user, + dirty_state=True, + state=enumerations.STATE_READY, + temporal_extent_start=start, + temporal_extent_end=end, + is_mosaic=is_mosaic, + has_time=has_time, + has_elevation=has_elevation, + time_regex=upload_session.mosaic_time_regex)) + created = True + else: + saved_dataset = saved_dataset_filter.get() + created = False - try: - # Update the state from session... - Upload.objects.update_from_session(upload_session, resource=saved_dataset) - - # Hide the dataset until the upload process finishes... - saved_dataset.set_dirty_state() - - # Finalize the upload... - with transaction.atomic(): - # Set default permissions on the newly created layer and send notifications - permissions = upload_session.permissions - - # Finalize Upload - resource_manager.set_permissions( - None, instance=saved_dataset, permissions=permissions, created=created) - resource_manager.update( - None, instance=saved_dataset, xml_file=xml_file, metadata_uploaded=metadata_uploaded) - resource_manager.exec( - 'set_style', None, instance=saved_dataset, sld_uploaded=sld_uploaded, sld_file=sld_file, tempdir=upload_session.tempdir) - resource_manager.exec( - 'set_time_info', None, instance=saved_dataset, time_info=upload_session.time_info) - resource_manager.set_thumbnail( - None, instance=saved_dataset) - - if Upload.objects.filter(resource=saved_dataset.get_self_resource()).exists(): - Upload.objects.filter(resource=saved_dataset.get_self_resource()).update(complete=True) - [u.set_processing_state(enumerations.STATE_PROCESSED) for u in Upload.objects.filter(resource=saved_dataset.get_self_resource())] - except Exception as e: - raise GeoNodeException(e) - finally: - if upload.state in (enumerations.STATE_PROCESSED, enumerations.STATE_INVALID): - # Get rid if temporary files that have been uploaded via Upload form - logger.debug(f"... Cleaning up the temporary folders {upload_session.tempdir}") - shutil.rmtree(upload_session.tempdir, ignore_errors=True) + assert saved_dataset + + if not created: + return saved_dataset + + try: + # Update the state from session... + Upload.objects.update_from_session(upload_session, resource=saved_dataset) + + # Hide the dataset until the upload process finishes... + saved_dataset.set_dirty_state() + + # Finalize the upload... + with transaction.atomic(): + # Set default permissions on the newly created layer and send notifications + permissions = upload_session.permissions + + # Finalize Upload + resource_manager.set_permissions( + None, instance=saved_dataset, permissions=permissions, created=created) + resource_manager.update( + None, instance=saved_dataset, xml_file=xml_file, metadata_uploaded=metadata_uploaded) + resource_manager.exec( + 'set_style', None, instance=saved_dataset, sld_uploaded=sld_uploaded, sld_file=sld_file, tempdir=upload_session.tempdir) + resource_manager.exec( + 'set_time_info', None, instance=saved_dataset, time_info=upload_session.time_info) + resource_manager.set_thumbnail( + None, instance=saved_dataset) + + if Upload.objects.filter(resource=saved_dataset.get_self_resource()).exists(): + Upload.objects.filter(resource=saved_dataset.get_self_resource()).update(complete=True) + [u.set_processing_state(enumerations.STATE_PROCESSED) for u in Upload.objects.filter(resource=saved_dataset.get_self_resource())] + except Exception as e: + raise GeoNodeException(e) + finally: + if upload.state in (enumerations.STATE_PROCESSED, enumerations.STATE_INVALID): + # Get rid if temporary files that have been uploaded via Upload form + logger.debug(f"... Cleaning up the temporary folders {upload_session.tempdir}") + shutil.rmtree(upload_session.tempdir, ignore_errors=True) return saved_dataset diff --git a/geonode/upload/views.py b/geonode/upload/views.py index 0a1cc0b3a90..952a031cc63 100644 --- a/geonode/upload/views.py +++ b/geonode/upload/views.py @@ -540,78 +540,73 @@ def final_step_view(req, upload_session): if not upload_session: upload_session = _get_upload_session(req) if upload_session and getattr(upload_session, 'import_session', None): - from geonode.tasks.tasks import AcquireLock - import_session = upload_session.import_session - lock_id = f'{upload_session.name}-{import_session.id}' - with AcquireLock(lock_id) as lock: - if lock.acquire() is True: - _log('Checking session %s validity', import_session.id) - if not check_import_session_is_valid( - req, upload_session, import_session): - error_msg = upload_session.import_session.tasks[0].error_message - url = "/upload/dataset_upload_invalid.html" - _json_response = json_response( - { - 'url': url, - 'status': 'error', - 'id': import_session.id, - 'error_msg': error_msg or 'Import Session is Invalid!', - 'success': False - } - ) - return _json_response - else: - try: - dataset_id = None - if req and 'dataset_id' in req.GET: - dataset = Dataset.objects.filter(id=req.GET['dataset_id']) - if dataset.exists(): - dataset_id = dataset.first().resourcebase_ptr_id - - saved_dataset = final_step(upload_session, upload_session.user, dataset_id) - - assert saved_dataset - - # this response is different then all of the other views in the - # upload as it does not return a response as a json object - _json_response = json_response( - { - 'status': 'finished', - 'id': import_session.id, - 'url': saved_dataset.get_absolute_url(), - 'bbox': saved_dataset.bbox_string, - 'crs': { - 'type': 'name', - 'properties': saved_dataset.srid - }, - 'success': True - } - ) - register_event(req, EventType.EVENT_UPLOAD, saved_dataset) - return _json_response - except (LayerNotReady, AssertionError): - force_ajax = '&force_ajax=true' if req and 'force_ajax' in req.GET and req.GET['force_ajax'] == 'true' else '' - return json_response( - { - 'status': 'pending', - 'success': True, - 'id': import_session.id, - 'redirect_to': f"/upload/final?id={import_session.id}{force_ajax}" - } - ) - except Exception as e: - logger.exception(e) - url = "upload/dataset_upload_invalid.html" - _json_response = json_response( - { - 'status': 'error', - 'url': url, - 'error_msg': str(e), - 'success': False - } - ) - return _json_response + _log('Checking session %s validity', import_session.id) + if not check_import_session_is_valid( + req, upload_session, import_session): + error_msg = upload_session.import_session.tasks[0].error_message + url = "/upload/dataset_upload_invalid.html" + _json_response = json_response( + { + 'url': url, + 'status': 'error', + 'id': import_session.id, + 'error_msg': error_msg or 'Import Session is Invalid!', + 'success': False + } + ) + return _json_response + else: + try: + dataset_id = None + if req and 'dataset_id' in req.GET: + dataset = Dataset.objects.filter(id=req.GET['dataset_id']) + if dataset.exists(): + dataset_id = dataset.first().resourcebase_ptr_id + + saved_dataset = final_step(upload_session, upload_session.user, dataset_id) + + assert saved_dataset + + # this response is different then all of the other views in the + # upload as it does not return a response as a json object + _json_response = json_response( + { + 'status': 'finished', + 'id': import_session.id, + 'url': saved_dataset.get_absolute_url(), + 'bbox': saved_dataset.bbox_string, + 'crs': { + 'type': 'name', + 'properties': saved_dataset.srid + }, + 'success': True + } + ) + register_event(req, EventType.EVENT_UPLOAD, saved_dataset) + return _json_response + except (LayerNotReady, AssertionError): + force_ajax = '&force_ajax=true' if req and 'force_ajax' in req.GET and req.GET['force_ajax'] == 'true' else '' + return json_response( + { + 'status': 'pending', + 'success': True, + 'id': import_session.id, + 'redirect_to': f"/upload/final?id={import_session.id}{force_ajax}" + } + ) + except Exception as e: + logger.exception(e) + url = "upload/dataset_upload_invalid.html" + _json_response = json_response( + { + 'status': 'error', + 'url': url, + 'error_msg': str(e), + 'success': False + } + ) + return _json_response return None else: url = "upload/dataset_upload_invalid.html"