From 725c2e22bf076aef3d38bc02032db0c359ea7aca Mon Sep 17 00:00:00 2001 From: afabiani Date: Sun, 10 Apr 2022 13:36:15 +0200 Subject: [PATCH] [Fixes #9064] Improve Upload Workflow resources state management --- geonode/base/api/views.py | 1 + geonode/geoserver/tasks.py | 130 +++--- geonode/monitoring/utils.py | 94 ++--- geonode/resource/api/tasks.py | 117 +++--- geonode/tasks/tasks.py | 9 +- geonode/upload/tasks.py | 117 +++--- geonode/upload/upload.py | 716 +++++++++++++++++----------------- 7 files changed, 612 insertions(+), 572 deletions(-) diff --git a/geonode/base/api/views.py b/geonode/base/api/views.py index 8395fa92204..928f4086fba 100644 --- a/geonode/base/api/views.py +++ b/geonode/base/api/views.py @@ -225,6 +225,7 @@ class HierarchicalKeywordViewSet(WithDynamicViewSetMixin, ListModelMixin, Retrie """ API endpoint that lists hierarchical keywords. """ + def get_queryset(self): resource_keywords = HierarchicalKeyword.resource_keywords_tree(self.request.user) slugs = [obj.get('href') for obj in resource_keywords] diff --git a/geonode/geoserver/tasks.py b/geonode/geoserver/tasks.py index fccb3ce1fec..4b4fb8118e1 100644 --- a/geonode/geoserver/tasks.py +++ b/geonode/geoserver/tasks.py @@ -67,7 +67,10 @@ def geoserver_update_datasets(self, *args, **kwargs): lock_id = f'{self.request.id}' with AcquireLock(lock_id) as lock: if lock.acquire() is True: - return gs_slurp(*args, **kwargs) + try: + return gs_slurp(*args, **kwargs) + finally: + lock.release() @app.task( @@ -109,6 +112,8 @@ def geoserver_set_style( base_file=base_file) except Exception as e: logger.exception(e) + finally: + lock.release() @app.task( @@ -144,48 +149,51 @@ def geoserver_create_style( lock_id = f'{self.request.id}' with AcquireLock(lock_id) as lock: if lock.acquire() is True and instance: - f = None - if sld_file and os.path.exists(sld_file) and os.access(sld_file, os.R_OK): - if os.path.isfile(sld_file): - try: - f = open(sld_file) - except Exception: - pass - elif tempdir and os.path.exists(tempdir): - if os.path.isfile(os.path.join(tempdir, sld_file)): + try: + f = None + if sld_file and os.path.exists(sld_file) and os.access(sld_file, os.R_OK): + if os.path.isfile(sld_file): try: - f = open(os.path.join(tempdir, sld_file)) + f = open(sld_file) except Exception: pass - if f: - sld = f.read() - f.close() - if not gs_catalog.get_style(name=name, workspace=settings.DEFAULT_WORKSPACE): - style = gs_catalog.create_style( - name, - sld, - raw=True, - workspace=settings.DEFAULT_WORKSPACE) - gs_dataset = gs_catalog.get_layer(name) - _default_style = gs_dataset.default_style - gs_dataset.default_style = style - gs_catalog.save(gs_dataset) - set_styles(instance, gs_catalog) - try: - gs_catalog.delete(_default_style) - Link.objects.filter( - resource=instance.resourcebase_ptr, - name='Legend', - url__contains=f'STYLE={_default_style.name}').delete() - except Exception as e: - logger.exception(e) + elif tempdir and os.path.exists(tempdir): + if os.path.isfile(os.path.join(tempdir, sld_file)): + try: + f = open(os.path.join(tempdir, sld_file)) + except Exception: + pass + if f: + sld = f.read() + f.close() + if not gs_catalog.get_style(name=name, workspace=settings.DEFAULT_WORKSPACE): + style = gs_catalog.create_style( + name, + sld, + raw=True, + workspace=settings.DEFAULT_WORKSPACE) + gs_dataset = gs_catalog.get_layer(name) + _default_style = gs_dataset.default_style + gs_dataset.default_style = style + gs_catalog.save(gs_dataset) + set_styles(instance, gs_catalog) + try: + gs_catalog.delete(_default_style) + Link.objects.filter( + resource=instance.resourcebase_ptr, + name='Legend', + url__contains=f'STYLE={_default_style.name}').delete() + except Exception as e: + logger.exception(e) + else: + get_sld_for(gs_catalog, instance) else: get_sld_for(gs_catalog, instance) - else: - get_sld_for(gs_catalog, instance) - if not f: - # this signal is used by the mapstore client to set the style in visual mode - geoserver_automatic_default_style_set.send_robust(sender=instance, instance=instance) + if not f: + # this signal is used by the mapstore client to set the style in visual mode + geoserver_automatic_default_style_set.send_robust(sender=instance, instance=instance) + finally: + lock.release() @app.task( @@ -211,11 +219,14 @@ def geoserver_post_save_datasets( lock_id = f'{self.request.id}' with AcquireLock(lock_id) as lock: if lock.acquire() is True: - sync_instance_with_geoserver(instance_id, *args, **kwargs) + try: + sync_instance_with_geoserver(instance_id, *args, **kwargs) - # Updating HAYSTACK Indexes if needed - if settings.HAYSTACK_SEARCH: - call_command('update_index') + # Updating HAYSTACK Indexes if needed + if settings.HAYSTACK_SEARCH: + call_command('update_index') + finally: + lock.release() @app.task( @@ -245,15 +256,18 @@ def geoserver_create_thumbnail(self, instance_id, overwrite=True, check_bbox=Tru lock_id = f'{self.request.id}' with AcquireLock(lock_id) as lock: if lock.acquire() is True: - instance.set_processing_state(enumerations.STATE_RUNNING) try: - instance.set_dirty_state() - create_gs_thumbnail(instance, overwrite=overwrite, check_bbox=check_bbox) - logger.debug(f"... Created Thumbnail for Dataset {instance.title}") - except Exception as e: - geoserver_create_thumbnail.retry(exc=e) + instance.set_processing_state(enumerations.STATE_RUNNING) + try: + instance.set_dirty_state() + create_gs_thumbnail(instance, overwrite=overwrite, check_bbox=check_bbox) + logger.debug(f"... Created Thumbnail for Dataset {instance.title}") + except Exception as e: + geoserver_create_thumbnail.retry(exc=e) + finally: + instance.set_processing_state(enumerations.STATE_PROCESSED) finally: - instance.set_processing_state(enumerations.STATE_PROCESSED) + lock.release() @app.task( @@ -276,7 +290,10 @@ def geoserver_cascading_delete(self, *args, **kwargs): lock_id = f'{self.request.id}' with AcquireLock(lock_id) as lock: if lock.acquire() is True: - return cascading_delete(*args, **kwargs) + try: + return cascading_delete(*args, **kwargs) + finally: + lock.release() @app.task( @@ -300,12 +317,15 @@ def geoserver_delete_map(self, object_id): with AcquireLock(lock_id) as lock: if lock.acquire() is True: try: - map_obj = Map.objects.get(id=object_id) - except Map.DoesNotExist: - return + try: + map_obj = Map.objects.get(id=object_id) + except Map.DoesNotExist: + return - map_obj.maplayers.all().delete() - map_obj.delete() + map_obj.maplayers.all().delete() + map_obj.delete() + finally: + lock.release() @shared_task( diff --git a/geonode/monitoring/utils.py b/geonode/monitoring/utils.py index 7c050c96f11..279d9349a07 100644 --- a/geonode/monitoring/utils.py +++ b/geonode/monitoring/utils.py @@ -418,53 +418,57 @@ def collect_metric(**options): log.info(f'[{lock_id}] Collecting Metrics - started @ {_start_time}') with AcquireLock(lock_id) as lock: if lock.acquire() is True: - log.info(f'[{lock_id}] Collecting Metrics - [...acquired lock] @ {_start_time}') try: - oservice = options['service'] - if not oservice: - services = Service.objects.all() - else: - services = [oservice] - if options['list_services']: - print('available services') + log.info(f'[{lock_id}] Collecting Metrics - [...acquired lock] @ {_start_time}') + try: + oservice = options['service'] + if not oservice: + services = Service.objects.all() + else: + services = [oservice] + if options['list_services']: + print('available services') + for s in services: + print(' ', s.name, '(', s.url, ')') + print(' type', s.service_type.name) + print(' running on', s.host.name, s.host.ip) + print(' active:', s.active) + if s.last_check: + print(' last check:', s.last_check) + else: + print(' not checked yet') + print(' ') + return + c = CollectorAPI() for s in services: - print(' ', s.name, '(', s.url, ')') - print(' type', s.service_type.name) - print(' running on', s.host.name, s.host.ip) - print(' active:', s.active) - if s.last_check: - print(' last check:', s.last_check) - else: - print(' not checked yet') - print(' ') - return - c = CollectorAPI() - for s in services: - try: - run_check( - s, - collector=c, - since=options['since'], - until=options['until'], - force_check=options['force_check'], - format=options['format']) - except Exception as e: - log.warning(e) - if not options['do_not_clear']: - log.info("Clearing old data") - c.clear_old_data() - if options['emit_notifications']: - log.info("Processing notifications for %s", options['until']) - # s = Service.objects.first() - # interval = s.check_interval - # now = datetime.utcnow().replace(tzinfo=pytz.utc) - # notifications_check = now - interval - c.emit_notifications() # notifications_check)) - _end_time = datetime.utcnow().isoformat() - log.info(f'[{lock_id}] Collecting Metrics - finished @ {_end_time}') - except Exception as e: - log.info(f'[{lock_id}] Collecting Metrics - errored @ {_end_time}') - log.exception(e) + try: + run_check( + s, + collector=c, + since=options['since'], + until=options['until'], + force_check=options['force_check'], + format=options['format']) + except Exception as e: + log.warning(e) + if not options['do_not_clear']: + log.info("Clearing old data") + c.clear_old_data() + if options['emit_notifications']: + log.info("Processing notifications for %s", options['until']) + # s = Service.objects.first() + # interval = s.check_interval + # now = datetime.utcnow().replace(tzinfo=pytz.utc) + # notifications_check = now - interval + c.emit_notifications() # notifications_check)) + _end_time = datetime.utcnow().isoformat() + log.info(f'[{lock_id}] Collecting Metrics - finished @ {_end_time}') + except Exception as e: + log.info(f'[{lock_id}] Collecting Metrics - errored @ {_end_time}') + log.exception(e) + finally: + lock.release() + log.info(f'[{lock_id}] Collecting Metrics - exit @ {_end_time}') return (_start_time, _end_time) diff --git a/geonode/resource/api/tasks.py b/geonode/resource/api/tasks.py index 509ec2d1f8a..6a72c882cef 100644 --- a/geonode/resource/api/tasks.py +++ b/geonode/resource/api/tasks.py @@ -92,74 +92,77 @@ def resouce_service_dispatcher(self, execution_id: str): """ with AcquireLock(execution_id) as lock: if lock.acquire() is True: - _exec_request = ExecutionRequest.objects.filter(exec_id=execution_id) - if _exec_request.exists(): - _request = _exec_request.get() - if _request.status == ExecutionRequest.STATUS_READY: - _exec_request.update( - status=ExecutionRequest.STATUS_RUNNING - ) - _request.refresh_from_db() - if hasattr(resource_manager, _request.func_name): - try: - _signature = signature(getattr(resource_manager, _request.func_name)) - _args = [] - _kwargs = {} - for _param_name in _signature.parameters: - if _request.input_params and _request.input_params.get(_param_name, None): - _param = _signature.parameters.get(_param_name) - _param_value = _get_param_value( - _param, _request.input_params.get(_param_name)) - if _param.kind == Parameter.POSITIONAL_ONLY: - _args.append(_param_value) - else: - _kwargs[_param_name] = _param_value + try: + _exec_request = ExecutionRequest.objects.filter(exec_id=execution_id) + if _exec_request.exists(): + _request = _exec_request.get() + if _request.status == ExecutionRequest.STATUS_READY: + _exec_request.update( + status=ExecutionRequest.STATUS_RUNNING + ) + _request.refresh_from_db() + if hasattr(resource_manager, _request.func_name): + try: + _signature = signature(getattr(resource_manager, _request.func_name)) + _args = [] + _kwargs = {} + for _param_name in _signature.parameters: + if _request.input_params and _request.input_params.get(_param_name, None): + _param = _signature.parameters.get(_param_name) + _param_value = _get_param_value( + _param, _request.input_params.get(_param_name)) + if _param.kind == Parameter.POSITIONAL_ONLY: + _args.append(_param_value) + else: + _kwargs[_param_name] = _param_value - _bindings = _signature.bind(*_args, **_kwargs) - _bindings.apply_defaults() + _bindings = _signature.bind(*_args, **_kwargs) + _bindings.apply_defaults() - _output = getattr(resource_manager, _request.func_name)(*_bindings.args, **_bindings.kwargs) - _output_params = {} - if _output is not None and _signature.return_annotation != Signature.empty: - if _signature.return_annotation.__module__ == 'builtins': + _output = getattr(resource_manager, _request.func_name)(*_bindings.args, **_bindings.kwargs) + _output_params = {} + if _output is not None and _signature.return_annotation != Signature.empty: + if _signature.return_annotation.__module__ == 'builtins': + _output_params = { + "output": _output + } + elif _signature.return_annotation == ResourceBase or isinstance(_output, ResourceBase): + _output_params = { + "output": { + "uuid": _output.uuid + } + } + else: _output_params = { - "output": _output + "output": None } - elif _signature.return_annotation == ResourceBase or isinstance(_output, ResourceBase): - _output_params = { - "output": { - "uuid": _output.uuid - } + _exec_request.update( + status=ExecutionRequest.STATUS_FINISHED, + finished=datetime.now(), + output_params=_output_params + ) + _request.refresh_from_db() + except Exception as e: + logger.exception(e) + _exec_request.update( + status=ExecutionRequest.STATUS_FAILED, + finished=datetime.now(), + output_params={ + "error": _(f"Error occurred while executin the operation: '{_request.func_name}'"), + "exception": str(e) } - else: - _output_params = { - "output": None - } - _exec_request.update( - status=ExecutionRequest.STATUS_FINISHED, - finished=datetime.now(), - output_params=_output_params - ) - _request.refresh_from_db() - except Exception as e: - logger.exception(e) + ) + _request.refresh_from_db() + else: _exec_request.update( status=ExecutionRequest.STATUS_FAILED, finished=datetime.now(), output_params={ - "error": _(f"Error occurred while executin the operation: '{_request.func_name}'"), - "exception": str(e) + "error": _(f"Could not find the operation name: '{_request.func_name}'") } ) _request.refresh_from_db() - else: - _exec_request.update( - status=ExecutionRequest.STATUS_FAILED, - finished=datetime.now(), - output_params={ - "error": _(f"Could not find the operation name: '{_request.func_name}'") - } - ) - _request.refresh_from_db() + finally: + lock.release() logger.debug(f"WARNING: The requested ExecutionRequest with 'exec_id'={execution_id} was not found!") diff --git a/geonode/tasks/tasks.py b/geonode/tasks/tasks.py index 75fdfda939a..19178771732 100644 --- a/geonode/tasks/tasks.py +++ b/geonode/tasks/tasks.py @@ -94,6 +94,9 @@ def __enter__(self): self.lock = memcache_lock(self.lock_id) return self + def __exit__(self, exc_type, exc_value, exc_traceback): + self.release() + def acquire(self): if settings.ASYNC_SIGNALS: try: @@ -103,12 +106,12 @@ def acquire(self): logger.warning(e) return True - def __exit__(self, exc_type, exc_value, exc_traceback): + def release(self): if self.lock: try: self.lock.release() - except Exception: - pass + except Exception as e: + logger.exception(e) class FaultTolerantTask(celery.Task): diff --git a/geonode/upload/tasks.py b/geonode/upload/tasks.py index deed7b016a8..1fe48271ee4 100644 --- a/geonode/upload/tasks.py +++ b/geonode/upload/tasks.py @@ -63,82 +63,85 @@ def finalize_incomplete_session_uploads(self, *args, **kwargs): lock_id = f'{self.request.id}' with AcquireLock(lock_id) as lock: if lock.acquire() is True: - with transaction.atomic(): - _upload_ids = [] - _upload_tasks = [] + try: + with transaction.atomic(): + _upload_ids = [] + _upload_tasks = [] - # Check first if we need to delete stale sessions - expiry_time = now() - timedelta(hours=UPLOAD_SESSION_EXPIRY_HOURS) - for _upload in Upload.objects.exclude(state=enumerations.STATE_PROCESSED).exclude(date__gt=expiry_time): - _upload.set_processing_state(enumerations.STATE_INVALID) - _upload_ids.append(_upload.id) - _upload_tasks.append( - _upload_session_cleanup.signature( - args=(_upload.id,) + # Check first if we need to delete stale sessions + expiry_time = now() - timedelta(hours=UPLOAD_SESSION_EXPIRY_HOURS) + for _upload in Upload.objects.exclude(state=enumerations.STATE_PROCESSED).exclude(date__gt=expiry_time): + _upload.set_processing_state(enumerations.STATE_INVALID) + _upload_ids.append(_upload.id) + _upload_tasks.append( + _upload_session_cleanup.signature( + args=(_upload.id,) + ) ) - ) - upload_workflow_finalizer = _upload_workflow_finalizer.signature( - args=('_upload_session_cleanup', _upload_ids,), - immutable=True - ).on_error( - _upload_workflow_error.signature( + upload_workflow_finalizer = _upload_workflow_finalizer.signature( args=('_upload_session_cleanup', _upload_ids,), immutable=True + ).on_error( + _upload_workflow_error.signature( + args=('_upload_session_cleanup', _upload_ids,), + immutable=True + ) ) - ) - upload_workflow = chord(_upload_tasks, body=upload_workflow_finalizer) - upload_workflow.apply_async() + upload_workflow = chord(_upload_tasks, body=upload_workflow_finalizer) + upload_workflow.apply_async() - # Let's finish the valid ones - _exclusion_processing_states = ( - enumerations.STATE_COMPLETE, - enumerations.STATE_PROCESSED) - for _upload in Upload.objects.exclude(state__in=_exclusion_processing_states).exclude(id__in=_upload_ids): - session = None - try: - if not _upload.import_id: - raise NotFound - session = _upload.get_session.import_session if _upload.get_session else None - if not session or session.state != enumerations.STATE_COMPLETE: - session = gs_uploader.get_session(_upload.import_id) - except (NotFound, Exception) as e: - logger.exception(e) + # Let's finish the valid ones + _exclusion_processing_states = ( + enumerations.STATE_COMPLETE, + enumerations.STATE_PROCESSED) + for _upload in Upload.objects.exclude(state__in=_exclusion_processing_states).exclude(id__in=_upload_ids): session = None + try: + if not _upload.import_id: + raise NotFound + session = _upload.get_session.import_session if _upload.get_session else None + if not session or session.state != enumerations.STATE_COMPLETE: + session = gs_uploader.get_session(_upload.import_id) + except (NotFound, Exception) as e: + logger.exception(e) + session = None - if session: - _upload_ids.append(_upload.id) - _upload_tasks.append( - _update_upload_session_state.signature( - args=(_upload.id,) - ) - ) - else: - if _upload.state not in (enumerations.STATE_READY, enumerations.STATE_COMPLETE, enumerations.STATE_PROCESSED): - _upload.set_processing_state(enumerations.STATE_INVALID) + if session: _upload_ids.append(_upload.id) _upload_tasks.append( - _upload_session_cleanup.signature( + _update_upload_session_state.signature( args=(_upload.id,) ) ) + else: + if _upload.state not in (enumerations.STATE_READY, enumerations.STATE_COMPLETE, enumerations.STATE_PROCESSED): + _upload.set_processing_state(enumerations.STATE_INVALID) + _upload_ids.append(_upload.id) + _upload_tasks.append( + _upload_session_cleanup.signature( + args=(_upload.id,) + ) + ) - upload_workflow_finalizer = _upload_workflow_finalizer.signature( - args=('_update_upload_session_state', _upload_ids,), - immutable=True - ).on_error( - _upload_workflow_error.signature( + upload_workflow_finalizer = _upload_workflow_finalizer.signature( args=('_update_upload_session_state', _upload_ids,), immutable=True + ).on_error( + _upload_workflow_error.signature( + args=('_update_upload_session_state', _upload_ids,), + immutable=True + ) ) - ) - upload_workflow = chord(_upload_tasks, body=upload_workflow_finalizer) - result = upload_workflow.apply_async() - if result.ready(): - with allow_join_result(): - return result.get() - return result.state + upload_workflow = chord(_upload_tasks, body=upload_workflow_finalizer) + result = upload_workflow.apply_async() + if result.ready(): + with allow_join_result(): + return result.get() + return result.state + finally: + lock.release() @app.task( diff --git a/geonode/upload/upload.py b/geonode/upload/upload.py index 8a6faa61054..003e9eced36 100644 --- a/geonode/upload/upload.py +++ b/geonode/upload/upload.py @@ -281,143 +281,146 @@ def save_step(user, layer, spatial_files, overwrite=True, store_spatial_files=Tr lock_id = 'upload-workflow-save_step' with AcquireLock(lock_id, blocking=True) as lock: if lock.acquire() is True: - logger.debug( - f'Uploading layer: {layer}, files {spatial_files}') - if len(spatial_files) > 1: - # we only support more than one file if they're rasters for mosaicing - if not all( - [f.file_type.dataset_type == 'coverage' for f in spatial_files]): - msg = "Please upload only one type of file at a time" + try: + logger.debug( + f'Uploading layer: {layer}, files {spatial_files}') + if len(spatial_files) > 1: + # we only support more than one file if they're rasters for mosaicing + if not all( + [f.file_type.dataset_type == 'coverage' for f in spatial_files]): + msg = "Please upload only one type of file at a time" + logger.exception(Exception(msg)) + raise GeneralUploadException(detail=msg) + name = get_valid_dataset_name(layer, overwrite) + logger.debug(f'Name for layer: {name}') + if not any(spatial_files.all_files()): + msg = "Unable to recognize the uploaded file(s)" logger.exception(Exception(msg)) raise GeneralUploadException(detail=msg) - name = get_valid_dataset_name(layer, overwrite) - logger.debug(f'Name for layer: {name}') - if not any(spatial_files.all_files()): - msg = "Unable to recognize the uploaded file(s)" - logger.exception(Exception(msg)) - raise GeneralUploadException(detail=msg) - the_dataset_type = get_dataset_type(spatial_files) - _check_geoserver_store(name, the_dataset_type, overwrite) - if the_dataset_type not in ( - FeatureType.resource_type, - Coverage.resource_type): - msg = f"Expected layer type to FeatureType or Coverage, not {the_dataset_type}" - logger.exception(Exception(msg)) - raise GeneralUploadException(msg) - files_to_upload = preprocess_files(spatial_files) - logger.debug(f"files_to_upload: {files_to_upload}") - logger.debug(f'Uploading {the_dataset_type}') - error_msg = None - try: - next_id = _get_next_id() - # Truncate name to maximum length defined by the field. - max_length = Upload._meta.get_field('name').max_length - name = name[:max_length] - # save record of this whether valid or not - will help w/ debugging - upload, _ = Upload.objects.get_or_create( - user=user, - name=name, - state=enumerations.STATE_READY, - upload_dir=spatial_files.dirname - ) - upload.store_spatial_files = store_spatial_files - - # @todo settings for use_url or auto detection if geoserver is - # on same host - - # Is it a regular file or an ImageMosaic? - # if mosaic_time_regex and mosaic_time_value: - if mosaic: # we want to ingest as ImageMosaic - target_store, files_to_upload = utils.import_imagemosaic_granules( - spatial_files, - append_to_mosaic_opts, - append_to_mosaic_name, - mosaic_time_regex, - mosaic_time_value, - time_presentation, - time_presentation_res, - time_presentation_default_value, - time_presentation_reference_value) - upload.mosaic = mosaic - upload.append_to_mosaic_opts = append_to_mosaic_opts - upload.append_to_mosaic_name = append_to_mosaic_name - upload.mosaic_time_regex = mosaic_time_regex - upload.mosaic_time_value = mosaic_time_value - # moving forward with a regular Importer session - if len(files_to_upload) > 1: - import_session = gs_uploader.upload_files( - files_to_upload[1:], - use_url=False, - # import_id=next_id, - target_store=target_store, - charset_encoding=charset_encoding - ) + the_dataset_type = get_dataset_type(spatial_files) + _check_geoserver_store(name, the_dataset_type, overwrite) + if the_dataset_type not in ( + FeatureType.resource_type, + Coverage.resource_type): + msg = f"Expected layer type to FeatureType or Coverage, not {the_dataset_type}" + logger.exception(Exception(msg)) + raise GeneralUploadException(msg) + files_to_upload = preprocess_files(spatial_files) + logger.debug(f"files_to_upload: {files_to_upload}") + logger.debug(f'Uploading {the_dataset_type}') + error_msg = None + try: + next_id = _get_next_id() + # Truncate name to maximum length defined by the field. + max_length = Upload._meta.get_field('name').max_length + name = name[:max_length] + # save record of this whether valid or not - will help w/ debugging + upload, _ = Upload.objects.get_or_create( + user=user, + name=name, + state=enumerations.STATE_READY, + upload_dir=spatial_files.dirname + ) + upload.store_spatial_files = store_spatial_files + + # @todo settings for use_url or auto detection if geoserver is + # on same host + + # Is it a regular file or an ImageMosaic? + # if mosaic_time_regex and mosaic_time_value: + if mosaic: # we want to ingest as ImageMosaic + target_store, files_to_upload = utils.import_imagemosaic_granules( + spatial_files, + append_to_mosaic_opts, + append_to_mosaic_name, + mosaic_time_regex, + mosaic_time_value, + time_presentation, + time_presentation_res, + time_presentation_default_value, + time_presentation_reference_value) + upload.mosaic = mosaic + upload.append_to_mosaic_opts = append_to_mosaic_opts + upload.append_to_mosaic_name = append_to_mosaic_name + upload.mosaic_time_regex = mosaic_time_regex + upload.mosaic_time_value = mosaic_time_value + # moving forward with a regular Importer session + if len(files_to_upload) > 1: + import_session = gs_uploader.upload_files( + files_to_upload[1:], + use_url=False, + # import_id=next_id, + target_store=target_store, + charset_encoding=charset_encoding + ) + else: + import_session = gs_uploader.upload_files( + files_to_upload, + use_url=False, + # import_id=next_id, + target_store=target_store, + charset_encoding=charset_encoding + ) + next_id = import_session.id if import_session else None + if not next_id: + error_msg = 'No valid Importer Session could be found' else: + # moving forward with a regular Importer session import_session = gs_uploader.upload_files( files_to_upload, use_url=False, - # import_id=next_id, + import_id=next_id, + mosaic=False, target_store=target_store, + name=name, charset_encoding=charset_encoding ) - next_id = import_session.id if import_session else None - if not next_id: - error_msg = 'No valid Importer Session could be found' - else: - # moving forward with a regular Importer session - import_session = gs_uploader.upload_files( - files_to_upload, - use_url=False, - import_id=next_id, - mosaic=False, - target_store=target_store, - name=name, - charset_encoding=charset_encoding - ) - upload.import_id = import_session.id - upload.save() - - # any unrecognized tasks/files must be deleted or we can't proceed - import_session.delete_unrecognized_tasks() - - if not mosaic: - if not import_session.tasks: - error_msg = 'No valid upload files could be found' - if import_session.tasks: - if import_session.tasks[0].state == 'NO_FORMAT' \ - or import_session.tasks[0].state == 'BAD_FORMAT': - error_msg = 'There may be a problem with the data provided - ' \ - 'we could not identify its format' - - if not mosaic and len(import_session.tasks) > 1: - error_msg = "Only a single upload is supported at the moment" - - if not error_msg and import_session.tasks: - task = import_session.tasks[0] - # single file tasks will have just a file entry - if hasattr(task, 'files'): - # @todo gsimporter - test this - if not all([hasattr(f, 'timestamp') - for f in task.source.files]): - error_msg = ( - "Not all timestamps could be recognized." - "Please ensure your files contain the correct formats.") + upload.import_id = import_session.id + upload.save() + + # any unrecognized tasks/files must be deleted or we can't proceed + import_session.delete_unrecognized_tasks() + + if not mosaic: + if not import_session.tasks: + error_msg = 'No valid upload files could be found' + if import_session.tasks: + if import_session.tasks[0].state == 'NO_FORMAT' \ + or import_session.tasks[0].state == 'BAD_FORMAT': + error_msg = 'There may be a problem with the data provided - ' \ + 'we could not identify its format' + + if not mosaic and len(import_session.tasks) > 1: + error_msg = "Only a single upload is supported at the moment" + + if not error_msg and import_session.tasks: + task = import_session.tasks[0] + # single file tasks will have just a file entry + if hasattr(task, 'files'): + # @todo gsimporter - test this + if not all([hasattr(f, 'timestamp') + for f in task.source.files]): + error_msg = ( + "Not all timestamps could be recognized." + "Please ensure your files contain the correct formats.") + + if error_msg: + upload.set_processing_state(enumerations.STATE_INVALID) + + # @todo once the random tmp9723481758915 type of name is not + # around, need to track the name computed above, for now, the + # target store name can be used + except Exception as e: + logger.exception(e) + raise e if error_msg: - upload.set_processing_state(enumerations.STATE_INVALID) - - # @todo once the random tmp9723481758915 type of name is not - # around, need to track the name computed above, for now, the - # target store name can be used - except Exception as e: - logger.exception(e) - raise e - - if error_msg: - logger.exception(Exception(error_msg)) - raise GeneralUploadException(detail=error_msg) - else: - _log("Finished upload of [%s] to GeoServer without errors.", name) + logger.exception(Exception(error_msg)) + raise GeneralUploadException(detail=error_msg) + else: + _log("Finished upload of [%s] to GeoServer without errors.", name) + finally: + lock.release() return import_session, upload @@ -579,255 +582,258 @@ def final_step(upload_session, user, charset="UTF-8", dataset_id=None): lock_id = f'{upload_session.name}-{import_session.id}' with AcquireLock(lock_id, blocking=False) as lock: if lock.acquire() is True: - _log(f'Reloading session {import_id} to check validity') try: - import_session = import_session.reload() - except gsimporter.api.NotFound as e: - logger.exception(e) - Upload.objects.invalidate_from_session(upload_session) - raise GeneralUploadException(detail=_("The GeoServer Import Session is no more available ") + e.args[0]) + _log(f'Reloading session {import_id} to check validity') + try: + import_session = import_session.reload() + except gsimporter.api.NotFound as e: + logger.exception(e) + Upload.objects.invalidate_from_session(upload_session) + raise GeneralUploadException(detail=_("The GeoServer Import Session is no more available ") + e.args[0]) - if Upload.objects.filter(import_id=import_id).count(): - Upload.objects.filter(import_id=import_id).update(complete=False) - upload = Upload.objects.filter(import_id=import_id).get() - if upload.state == enumerations.STATE_RUNNING: - return + if Upload.objects.filter(import_id=import_id).count(): + Upload.objects.filter(import_id=import_id).update(complete=False) + upload = Upload.objects.filter(import_id=import_id).get() + if upload.state == enumerations.STATE_RUNNING: + return - upload_session.import_session = import_session - Upload.objects.update_from_session(upload_session) + upload_session.import_session = import_session + Upload.objects.update_from_session(upload_session) - # Create the style and assign it to the created resource - # FIXME: Put this in gsconfig.py - task = import_session.tasks[0] - task.set_charset(charset) + # Create the style and assign it to the created resource + # FIXME: Put this in gsconfig.py + task = import_session.tasks[0] + task.set_charset(charset) - # @todo see above in save_step, regarding computed unique name - name = task.layer.name + # @todo see above in save_step, regarding computed unique name + name = task.layer.name - if dataset_id: - name = Dataset.objects.get(resourcebase_ptr_id=dataset_id).name + if dataset_id: + name = Dataset.objects.get(resourcebase_ptr_id=dataset_id).name - _log(f'Getting from catalog [{name}]') - try: - # the importer chooses an available featuretype name late in the game need - # to verify the resource.name otherwise things will fail. This happens - # when the same data is uploaded a second time and the default name is - # chosen - gs_catalog.get_layer(name) - except Exception: - Upload.objects.invalidate_from_session(upload_session) - raise LayerNotReady( - _(f"Expected to find layer named '{name}' in geoserver")) - - if import_session.state == 'READY' or (import_session.state == 'PENDING' and task.state == 'READY'): - import_session.commit() - elif import_session.state == 'INCOMPLETE' and task.state != 'ERROR': - Upload.objects.invalidate_from_session(upload_session) - raise Exception(f'unknown item state: {task.state}') - try: - import_session = import_session.reload() - except gsimporter.api.NotFound as e: - logger.exception(e) - Upload.objects.invalidate_from_session(upload_session) - raise GeneralUploadException(detail=_("The GeoServer Import Session is no more available ") + e.args[0]) - upload_session.import_session = import_session - Upload.objects.update_from_session(upload_session) - - _log(f'Creating Django record for [{name}]') - target = task.target - alternate = task.get_target_layer_name() - dataset_uuid = None - title = upload_session.dataset_title - abstract = upload_session.dataset_abstract - - metadata_uploaded = False - xml_file = upload_session.base_file[0].xml_files - if xml_file: + _log(f'Getting from catalog [{name}]') + try: + # the importer chooses an available featuretype name late in the game need + # to verify the resource.name otherwise things will fail. This happens + # when the same data is uploaded a second time and the default name is + # chosen + gs_catalog.get_layer(name) + except Exception: + Upload.objects.invalidate_from_session(upload_session) + raise LayerNotReady( + _(f"Expected to find layer named '{name}' in geoserver")) + + if import_session.state == 'READY' or (import_session.state == 'PENDING' and task.state == 'READY'): + import_session.commit() + elif import_session.state == 'INCOMPLETE' and task.state != 'ERROR': + Upload.objects.invalidate_from_session(upload_session) + raise Exception(f'unknown item state: {task.state}') try: - # get model properties from XML + import_session = import_session.reload() + except gsimporter.api.NotFound as e: + logger.exception(e) + Upload.objects.invalidate_from_session(upload_session) + raise GeneralUploadException(detail=_("The GeoServer Import Session is no more available ") + e.args[0]) + upload_session.import_session = import_session + Upload.objects.update_from_session(upload_session) + + _log(f'Creating Django record for [{name}]') + target = task.target + alternate = task.get_target_layer_name() + dataset_uuid = None + title = upload_session.dataset_title + abstract = upload_session.dataset_abstract + + metadata_uploaded = False + xml_file = upload_session.base_file[0].xml_files + if xml_file: + try: + # get model properties from XML + # If it's contained within a zip, need to extract it + if upload_session.base_file.archive: + archive = upload_session.base_file.archive + zf = zipfile.ZipFile(archive, 'r', allowZip64=True) + zf.extract(xml_file[0], os.path.dirname(archive)) + # Assign the absolute path to this file + xml_file = f"{os.path.dirname(archive)}/{xml_file[0]}" + + # Sanity checks + if isinstance(xml_file, list): + if len(xml_file) > 0: + xml_file = xml_file[0] + else: + xml_file = None + elif not isinstance(xml_file, str): + xml_file = None + + if xml_file and os.path.exists(xml_file) and os.access(xml_file, os.R_OK): + dataset_uuid, vals, regions, keywords, custom = parse_metadata( + open(xml_file).read()) + metadata_uploaded = True + except Exception as e: + Upload.objects.invalidate_from_session(upload_session) + logger.error(e) + raise GeoNodeException( + _("Exception occurred while parsing the provided Metadata file."), e) + + # look for SLD + sld_file = upload_session.base_file[0].sld_files + sld_uploaded = False + if sld_file: # If it's contained within a zip, need to extract it if upload_session.base_file.archive: archive = upload_session.base_file.archive + logger.debug(f'using uploaded sld file from {archive}') zf = zipfile.ZipFile(archive, 'r', allowZip64=True) - zf.extract(xml_file[0], os.path.dirname(archive)) + zf.extract(sld_file[0], os.path.dirname(archive), path=upload_session.tempdir) # Assign the absolute path to this file - xml_file = f"{os.path.dirname(archive)}/{xml_file[0]}" - - # Sanity checks - if isinstance(xml_file, list): - if len(xml_file) > 0: - xml_file = xml_file[0] - else: - xml_file = None - elif not isinstance(xml_file, str): - xml_file = None - - if xml_file and os.path.exists(xml_file) and os.access(xml_file, os.R_OK): - dataset_uuid, vals, regions, keywords, custom = parse_metadata( - open(xml_file).read()) - metadata_uploaded = True - except Exception as e: + sld_file[0] = f"{os.path.dirname(archive)}/{sld_file[0]}" + else: + _sld_file = f"{os.path.dirname(upload_session.tempdir)}/{os.path.basename(sld_file[0])}" + logger.debug(f"copying [{sld_file[0]}] to [{_sld_file}]") + try: + shutil.copyfile(sld_file[0], _sld_file) + sld_file = _sld_file + except (IsADirectoryError, shutil.SameFileError) as e: + logger.exception(e) + sld_file = sld_file[0] + except Exception as e: + logger.exception(e) + raise GeneralUploadException(detail=_('Error uploading Dataset') + e.args[0]) + sld_uploaded = True + else: + # get_files will not find the sld if it doesn't match the base name + # so we've worked around that in the view - if provided, it will be here + if upload_session.import_sld_file: + logger.debug('using provided sld file from importer') + base_file = upload_session.base_file + sld_file = base_file[0].sld_files[0] + sld_uploaded = False + logger.debug(f'[sld_uploaded: {sld_uploaded}] sld_file: {sld_file}') + + # Make sure the layer does not exists already + if dataset_uuid and Dataset.objects.filter(uuid=dataset_uuid).count(): Upload.objects.invalidate_from_session(upload_session) - logger.error(e) + logger.error("The UUID identifier from the XML Metadata is already in use in this system.") raise GeoNodeException( - _("Exception occurred while parsing the provided Metadata file."), e) - - # look for SLD - sld_file = upload_session.base_file[0].sld_files - sld_uploaded = False - if sld_file: - # If it's contained within a zip, need to extract it - if upload_session.base_file.archive: - archive = upload_session.base_file.archive - logger.debug(f'using uploaded sld file from {archive}') - zf = zipfile.ZipFile(archive, 'r', allowZip64=True) - zf.extract(sld_file[0], os.path.dirname(archive), path=upload_session.tempdir) - # Assign the absolute path to this file - sld_file[0] = f"{os.path.dirname(archive)}/{sld_file[0]}" - else: - _sld_file = f"{os.path.dirname(upload_session.tempdir)}/{os.path.basename(sld_file[0])}" - logger.debug(f"copying [{sld_file[0]}] to [{_sld_file}]") - try: - shutil.copyfile(sld_file[0], _sld_file) - sld_file = _sld_file - except (IsADirectoryError, shutil.SameFileError) as e: - logger.exception(e) - sld_file = sld_file[0] - except Exception as e: - logger.exception(e) - raise GeneralUploadException(detail=_('Error uploading Dataset') + e.args[0]) - sld_uploaded = True - else: - # get_files will not find the sld if it doesn't match the base name - # so we've worked around that in the view - if provided, it will be here - if upload_session.import_sld_file: - logger.debug('using provided sld file from importer') - base_file = upload_session.base_file - sld_file = base_file[0].sld_files[0] - sld_uploaded = False - logger.debug(f'[sld_uploaded: {sld_uploaded}] sld_file: {sld_file}') - - # Make sure the layer does not exists already - if dataset_uuid and Dataset.objects.filter(uuid=dataset_uuid).count(): - Upload.objects.invalidate_from_session(upload_session) - logger.error("The UUID identifier from the XML Metadata is already in use in this system.") - raise GeoNodeException( - _("The UUID identifier from the XML Metadata is already in use in this system.")) - - # Is it a regular file or an ImageMosaic? - is_mosaic = False - has_time = has_elevation = False - start = end = None - if upload_session.mosaic_time_regex and upload_session.mosaic_time_value: - has_time = True - is_mosaic = True - start = datetime.datetime.strptime(upload_session.mosaic_time_value, - TIME_REGEX_FORMAT[upload_session.mosaic_time_regex]) - start = pytz.utc.localize(start, is_dst=False) - end = start - if upload_session.time and upload_session.time_info and upload_session.time_transforms: - has_time = True - - if upload_session.append_to_mosaic_opts: - # Is it a mosaic or a granule that must be added to an Image Mosaic? - saved_dataset_filter = Dataset.objects.filter( - name=upload_session.append_to_mosaic_name) - if not saved_dataset_filter.exists(): - saved_dataset = resource_manager.create( - name=upload_session.append_to_mosaic_name, - defaults=dict( - dirty_state=True, - state=enumerations.STATE_READY) - ) - created = True - else: - saved_dataset = saved_dataset_filter.get() - created = False - saved_dataset.set_dirty_state() - if saved_dataset.temporal_extent_start and end: - if pytz.utc.localize( - saved_dataset.temporal_extent_start, - is_dst=False) < end: - saved_dataset.temporal_extent_end = end - Dataset.objects.filter( - name=upload_session.append_to_mosaic_name).update( - temporal_extent_end=end) + _("The UUID identifier from the XML Metadata is already in use in this system.")) + + # Is it a regular file or an ImageMosaic? + is_mosaic = False + has_time = has_elevation = False + start = end = None + if upload_session.mosaic_time_regex and upload_session.mosaic_time_value: + has_time = True + is_mosaic = True + start = datetime.datetime.strptime(upload_session.mosaic_time_value, + TIME_REGEX_FORMAT[upload_session.mosaic_time_regex]) + start = pytz.utc.localize(start, is_dst=False) + end = start + if upload_session.time and upload_session.time_info and upload_session.time_transforms: + has_time = True + + if upload_session.append_to_mosaic_opts: + # Is it a mosaic or a granule that must be added to an Image Mosaic? + saved_dataset_filter = Dataset.objects.filter( + name=upload_session.append_to_mosaic_name) + if not saved_dataset_filter.exists(): + saved_dataset = resource_manager.create( + name=upload_session.append_to_mosaic_name, + defaults=dict( + dirty_state=True, + state=enumerations.STATE_READY) + ) + created = True else: - saved_dataset.temporal_extent_start = end - Dataset.objects.filter( - name=upload_session.append_to_mosaic_name).update( - temporal_extent_start=end) - else: - # The dataset is a standard one, no mosaic options enabled... - saved_dataset_filter = Dataset.objects.filter( - store=target.name, - alternate=alternate, - workspace=target.workspace_name, - name=task.layer.name) - if not saved_dataset_filter.exists(): - saved_dataset = resource_manager.create( - dataset_uuid, - resource_type=Dataset, - defaults=dict( - store=target.name, - subtype=get_dataset_storetype(target.store_type), - alternate=alternate, - workspace=target.workspace_name, - title=title, - name=task.layer.name, - abstract=abstract or _('No abstract provided'), - owner=user, - dirty_state=True, - state=enumerations.STATE_READY, - temporal_extent_start=start, - temporal_extent_end=end, - is_mosaic=is_mosaic, - has_time=has_time, - has_elevation=has_elevation, - time_regex=upload_session.mosaic_time_regex)) - created = True + saved_dataset = saved_dataset_filter.get() + created = False + saved_dataset.set_dirty_state() + if saved_dataset.temporal_extent_start and end: + if pytz.utc.localize( + saved_dataset.temporal_extent_start, + is_dst=False) < end: + saved_dataset.temporal_extent_end = end + Dataset.objects.filter( + name=upload_session.append_to_mosaic_name).update( + temporal_extent_end=end) + else: + saved_dataset.temporal_extent_start = end + Dataset.objects.filter( + name=upload_session.append_to_mosaic_name).update( + temporal_extent_start=end) else: - saved_dataset = saved_dataset_filter.get() - created = False + # The dataset is a standard one, no mosaic options enabled... + saved_dataset_filter = Dataset.objects.filter( + store=target.name, + alternate=alternate, + workspace=target.workspace_name, + name=task.layer.name) + if not saved_dataset_filter.exists(): + saved_dataset = resource_manager.create( + dataset_uuid, + resource_type=Dataset, + defaults=dict( + store=target.name, + subtype=get_dataset_storetype(target.store_type), + alternate=alternate, + workspace=target.workspace_name, + title=title, + name=task.layer.name, + abstract=abstract or _('No abstract provided'), + owner=user, + dirty_state=True, + state=enumerations.STATE_READY, + temporal_extent_start=start, + temporal_extent_end=end, + is_mosaic=is_mosaic, + has_time=has_time, + has_elevation=has_elevation, + time_regex=upload_session.mosaic_time_regex)) + created = True + else: + saved_dataset = saved_dataset_filter.get() + created = False - assert saved_dataset + assert saved_dataset - if not created: - return saved_dataset + if not created: + return saved_dataset - try: - # Update the state from session... - Upload.objects.update_from_session(upload_session, resource=saved_dataset) - - # Hide the dataset until the upload process finishes... - saved_dataset.set_dirty_state() - - # Finalize the upload... - with transaction.atomic(): - # Set default permissions on the newly created layer and send notifications - permissions = upload_session.permissions - - # Finalize Upload - resource_manager.set_permissions( - None, instance=saved_dataset, permissions=permissions, created=created) - resource_manager.update( - None, instance=saved_dataset, xml_file=xml_file, metadata_uploaded=metadata_uploaded) - resource_manager.exec( - 'set_style', None, instance=saved_dataset, sld_uploaded=sld_uploaded, sld_file=sld_file, tempdir=upload_session.tempdir) - resource_manager.exec( - 'set_time_info', None, instance=saved_dataset, time_info=upload_session.time_info) - resource_manager.set_thumbnail( - None, instance=saved_dataset) - - if Upload.objects.filter(resource=saved_dataset.get_self_resource()).exists(): - Upload.objects.filter(resource=saved_dataset.get_self_resource()).update(complete=True) - [u.set_processing_state(enumerations.STATE_PROCESSED) for u in Upload.objects.filter(resource=saved_dataset.get_self_resource())] - except Exception as e: - raise GeoNodeException(e) + try: + # Update the state from session... + Upload.objects.update_from_session(upload_session, resource=saved_dataset) + + # Hide the dataset until the upload process finishes... + saved_dataset.set_dirty_state() + + # Finalize the upload... + with transaction.atomic(): + # Set default permissions on the newly created layer and send notifications + permissions = upload_session.permissions + + # Finalize Upload + resource_manager.set_permissions( + None, instance=saved_dataset, permissions=permissions, created=created) + resource_manager.update( + None, instance=saved_dataset, xml_file=xml_file, metadata_uploaded=metadata_uploaded) + resource_manager.exec( + 'set_style', None, instance=saved_dataset, sld_uploaded=sld_uploaded, sld_file=sld_file, tempdir=upload_session.tempdir) + resource_manager.exec( + 'set_time_info', None, instance=saved_dataset, time_info=upload_session.time_info) + resource_manager.set_thumbnail( + None, instance=saved_dataset) + + if Upload.objects.filter(resource=saved_dataset.get_self_resource()).exists(): + Upload.objects.filter(resource=saved_dataset.get_self_resource()).update(complete=True) + [u.set_processing_state(enumerations.STATE_PROCESSED) for u in Upload.objects.filter(resource=saved_dataset.get_self_resource())] + except Exception as e: + raise GeoNodeException(e) + finally: + if upload.state in (enumerations.STATE_PROCESSED, enumerations.STATE_INVALID): + # Get rid if temporary files that have been uploaded via Upload form + logger.debug(f"... Cleaning up the temporary folders {upload_session.tempdir}") + shutil.rmtree(upload_session.tempdir, ignore_errors=True) finally: - if upload.state in (enumerations.STATE_PROCESSED, enumerations.STATE_INVALID): - # Get rid if temporary files that have been uploaded via Upload form - logger.debug(f"... Cleaning up the temporary folders {upload_session.tempdir}") - shutil.rmtree(upload_session.tempdir, ignore_errors=True) + lock.release() return saved_dataset