Skip to content

Commit

Permalink
[Fixes #9064] Improve Upload Workflow resources state management
Browse files Browse the repository at this point in the history
  • Loading branch information
afabiani committed Apr 10, 2022
1 parent c4f9ca8 commit 725c2e2
Show file tree
Hide file tree
Showing 7 changed files with 612 additions and 572 deletions.
1 change: 1 addition & 0 deletions geonode/base/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class HierarchicalKeywordViewSet(WithDynamicViewSetMixin, ListModelMixin, Retrie
"""
API endpoint that lists hierarchical keywords.
"""

def get_queryset(self):
resource_keywords = HierarchicalKeyword.resource_keywords_tree(self.request.user)
slugs = [obj.get('href') for obj in resource_keywords]
Expand Down
130 changes: 75 additions & 55 deletions geonode/geoserver/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ def geoserver_update_datasets(self, *args, **kwargs):
lock_id = f'{self.request.id}'
with AcquireLock(lock_id) as lock:
if lock.acquire() is True:
return gs_slurp(*args, **kwargs)
try:
return gs_slurp(*args, **kwargs)
finally:
lock.release()


@app.task(
Expand Down Expand Up @@ -109,6 +112,8 @@ def geoserver_set_style(
base_file=base_file)
except Exception as e:
logger.exception(e)
finally:
lock.release()


@app.task(
Expand Down Expand Up @@ -144,48 +149,51 @@ def geoserver_create_style(
lock_id = f'{self.request.id}'
with AcquireLock(lock_id) as lock:
if lock.acquire() is True and instance:
f = None
if sld_file and os.path.exists(sld_file) and os.access(sld_file, os.R_OK):
if os.path.isfile(sld_file):
try:
f = open(sld_file)
except Exception:
pass
elif tempdir and os.path.exists(tempdir):
if os.path.isfile(os.path.join(tempdir, sld_file)):
try:
f = None
if sld_file and os.path.exists(sld_file) and os.access(sld_file, os.R_OK):
if os.path.isfile(sld_file):
try:
f = open(os.path.join(tempdir, sld_file))
f = open(sld_file)
except Exception:
pass
if f:
sld = f.read()
f.close()
if not gs_catalog.get_style(name=name, workspace=settings.DEFAULT_WORKSPACE):
style = gs_catalog.create_style(
name,
sld,
raw=True,
workspace=settings.DEFAULT_WORKSPACE)
gs_dataset = gs_catalog.get_layer(name)
_default_style = gs_dataset.default_style
gs_dataset.default_style = style
gs_catalog.save(gs_dataset)
set_styles(instance, gs_catalog)
try:
gs_catalog.delete(_default_style)
Link.objects.filter(
resource=instance.resourcebase_ptr,
name='Legend',
url__contains=f'STYLE={_default_style.name}').delete()
except Exception as e:
logger.exception(e)
elif tempdir and os.path.exists(tempdir):
if os.path.isfile(os.path.join(tempdir, sld_file)):
try:
f = open(os.path.join(tempdir, sld_file))
except Exception:
pass
if f:
sld = f.read()
f.close()
if not gs_catalog.get_style(name=name, workspace=settings.DEFAULT_WORKSPACE):
style = gs_catalog.create_style(
name,
sld,
raw=True,
workspace=settings.DEFAULT_WORKSPACE)
gs_dataset = gs_catalog.get_layer(name)
_default_style = gs_dataset.default_style
gs_dataset.default_style = style
gs_catalog.save(gs_dataset)
set_styles(instance, gs_catalog)
try:
gs_catalog.delete(_default_style)
Link.objects.filter(
resource=instance.resourcebase_ptr,
name='Legend',
url__contains=f'STYLE={_default_style.name}').delete()
except Exception as e:
logger.exception(e)
else:
get_sld_for(gs_catalog, instance)
else:
get_sld_for(gs_catalog, instance)
else:
get_sld_for(gs_catalog, instance)
if not f:
# this signal is used by the mapstore client to set the style in visual mode
geoserver_automatic_default_style_set.send_robust(sender=instance, instance=instance)
if not f:
# this signal is used by the mapstore client to set the style in visual mode
geoserver_automatic_default_style_set.send_robust(sender=instance, instance=instance)
finally:
lock.release()


@app.task(
Expand All @@ -211,11 +219,14 @@ def geoserver_post_save_datasets(
lock_id = f'{self.request.id}'
with AcquireLock(lock_id) as lock:
if lock.acquire() is True:
sync_instance_with_geoserver(instance_id, *args, **kwargs)
try:
sync_instance_with_geoserver(instance_id, *args, **kwargs)

# Updating HAYSTACK Indexes if needed
if settings.HAYSTACK_SEARCH:
call_command('update_index')
# Updating HAYSTACK Indexes if needed
if settings.HAYSTACK_SEARCH:
call_command('update_index')
finally:
lock.release()


@app.task(
Expand Down Expand Up @@ -245,15 +256,18 @@ def geoserver_create_thumbnail(self, instance_id, overwrite=True, check_bbox=Tru
lock_id = f'{self.request.id}'
with AcquireLock(lock_id) as lock:
if lock.acquire() is True:
instance.set_processing_state(enumerations.STATE_RUNNING)
try:
instance.set_dirty_state()
create_gs_thumbnail(instance, overwrite=overwrite, check_bbox=check_bbox)
logger.debug(f"... Created Thumbnail for Dataset {instance.title}")
except Exception as e:
geoserver_create_thumbnail.retry(exc=e)
instance.set_processing_state(enumerations.STATE_RUNNING)
try:
instance.set_dirty_state()
create_gs_thumbnail(instance, overwrite=overwrite, check_bbox=check_bbox)
logger.debug(f"... Created Thumbnail for Dataset {instance.title}")
except Exception as e:
geoserver_create_thumbnail.retry(exc=e)
finally:
instance.set_processing_state(enumerations.STATE_PROCESSED)
finally:
instance.set_processing_state(enumerations.STATE_PROCESSED)
lock.release()


@app.task(
Expand All @@ -276,7 +290,10 @@ def geoserver_cascading_delete(self, *args, **kwargs):
lock_id = f'{self.request.id}'
with AcquireLock(lock_id) as lock:
if lock.acquire() is True:
return cascading_delete(*args, **kwargs)
try:
return cascading_delete(*args, **kwargs)
finally:
lock.release()


@app.task(
Expand All @@ -300,12 +317,15 @@ def geoserver_delete_map(self, object_id):
with AcquireLock(lock_id) as lock:
if lock.acquire() is True:
try:
map_obj = Map.objects.get(id=object_id)
except Map.DoesNotExist:
return
try:
map_obj = Map.objects.get(id=object_id)
except Map.DoesNotExist:
return

map_obj.maplayers.all().delete()
map_obj.delete()
map_obj.maplayers.all().delete()
map_obj.delete()
finally:
lock.release()


@shared_task(
Expand Down
94 changes: 49 additions & 45 deletions geonode/monitoring/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,53 +418,57 @@ def collect_metric(**options):
log.info(f'[{lock_id}] Collecting Metrics - started @ {_start_time}')
with AcquireLock(lock_id) as lock:
if lock.acquire() is True:
log.info(f'[{lock_id}] Collecting Metrics - [...acquired lock] @ {_start_time}')
try:
oservice = options['service']
if not oservice:
services = Service.objects.all()
else:
services = [oservice]
if options['list_services']:
print('available services')
log.info(f'[{lock_id}] Collecting Metrics - [...acquired lock] @ {_start_time}')
try:
oservice = options['service']
if not oservice:
services = Service.objects.all()
else:
services = [oservice]
if options['list_services']:
print('available services')
for s in services:
print(' ', s.name, '(', s.url, ')')
print(' type', s.service_type.name)
print(' running on', s.host.name, s.host.ip)
print(' active:', s.active)
if s.last_check:
print(' last check:', s.last_check)
else:
print(' not checked yet')
print(' ')
return
c = CollectorAPI()
for s in services:
print(' ', s.name, '(', s.url, ')')
print(' type', s.service_type.name)
print(' running on', s.host.name, s.host.ip)
print(' active:', s.active)
if s.last_check:
print(' last check:', s.last_check)
else:
print(' not checked yet')
print(' ')
return
c = CollectorAPI()
for s in services:
try:
run_check(
s,
collector=c,
since=options['since'],
until=options['until'],
force_check=options['force_check'],
format=options['format'])
except Exception as e:
log.warning(e)
if not options['do_not_clear']:
log.info("Clearing old data")
c.clear_old_data()
if options['emit_notifications']:
log.info("Processing notifications for %s", options['until'])
# s = Service.objects.first()
# interval = s.check_interval
# now = datetime.utcnow().replace(tzinfo=pytz.utc)
# notifications_check = now - interval
c.emit_notifications() # notifications_check))
_end_time = datetime.utcnow().isoformat()
log.info(f'[{lock_id}] Collecting Metrics - finished @ {_end_time}')
except Exception as e:
log.info(f'[{lock_id}] Collecting Metrics - errored @ {_end_time}')
log.exception(e)
try:
run_check(
s,
collector=c,
since=options['since'],
until=options['until'],
force_check=options['force_check'],
format=options['format'])
except Exception as e:
log.warning(e)
if not options['do_not_clear']:
log.info("Clearing old data")
c.clear_old_data()
if options['emit_notifications']:
log.info("Processing notifications for %s", options['until'])
# s = Service.objects.first()
# interval = s.check_interval
# now = datetime.utcnow().replace(tzinfo=pytz.utc)
# notifications_check = now - interval
c.emit_notifications() # notifications_check))
_end_time = datetime.utcnow().isoformat()
log.info(f'[{lock_id}] Collecting Metrics - finished @ {_end_time}')
except Exception as e:
log.info(f'[{lock_id}] Collecting Metrics - errored @ {_end_time}')
log.exception(e)
finally:
lock.release()

log.info(f'[{lock_id}] Collecting Metrics - exit @ {_end_time}')
return (_start_time, _end_time)

Expand Down
Loading

0 comments on commit 725c2e2

Please sign in to comment.