Skip to content

Commit

Permalink
Merge pull request #529 from dandi/scheduled-draft-manifests
Browse files Browse the repository at this point in the history
Scheduled draft manifests
  • Loading branch information
dchiquito authored Sep 23, 2021
2 parents e512959 + f7680d3 commit 10cb3b9
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 32 deletions.
5 changes: 4 additions & 1 deletion Procfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
release: ./manage.py migrate
web: gunicorn --bind 0.0.0.0:$PORT dandiapi.wsgi
worker: REMAP_SIGTERM=SIGQUIT celery --app dandiapi.celery worker --loglevel INFO
# We are using the -B flag to launch the beat scheduler within the worker thread
# This means that we cannot have multiple workers, as they would all trigger beat events
# The alternative would be a separate worker to drive the beat, which is costly at our current Heroku dyno tier
worker: REMAP_SIGTERM=SIGQUIT celery --app dandiapi.celery worker --loglevel INFO -B
32 changes: 32 additions & 0 deletions dandiapi/api/scheduled_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
Define and register any scheduled celery tasks.
This module is imported from celery.py in a post-app-load hook.
"""

from celery import shared_task
from celery.schedules import crontab
from celery.utils.log import get_task_logger
from django.db.transaction import atomic

from dandiapi.api.models import Version
from dandiapi.api.tasks import write_manifest_files

logger = get_task_logger(__name__)


@shared_task
@atomic
def write_draft_manifest_files():
logger.info('Writing manifest files for all draft versions')
# TODO Optimize this if it causes too much load in production.
# Rewriting every draft manifest every time is guaranteed not to miss any changes,
# so just do that for now to avoid the complexity involved with modification dates.
for draft_version in Version.objects.filter(version='draft').all():
write_manifest_files.delay(draft_version.id)


def register_scheduled_tasks(sender, **kwargs):
"""Register tasks with a celery beat schedule."""
# Write out all draft manifests every day at 1 AM
sender.add_periodic_task(crontab(hour=1), write_draft_manifest_files.s())
2 changes: 1 addition & 1 deletion dandiapi/api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ def calculate_sha256(blob_id: int) -> None:
@shared_task
@atomic
def write_manifest_files(version_id: int) -> None:
logger.info('Writing manifests for version %s', version_id)
version: Version = Version.objects.get(id=version_id)
logger.info('Writing manifests for version %s:%s', version.dandiset.identifier, version.version)

write_dandiset_yaml(version, logger=logger)
write_assets_yaml(version, logger=logger)
Expand Down
15 changes: 3 additions & 12 deletions dandiapi/api/tests/test_dandiset.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,7 @@ def test_dandiset_rest_create(api_client, user):
assert dandiset.draft_version.metadata == {
**metadata,
'manifestLocation': [
(
f'https://api.dandiarchive.org/api/dandisets/{dandiset.identifier}'
f'/versions/draft/assets/'
)
f'https://api.dandiarchive.org/api/dandisets/{dandiset.identifier}/versions/draft/assets/' # noqa: E501
],
'name': name,
'identifier': DANDISET_SCHEMA_ID_RE,
Expand Down Expand Up @@ -320,10 +317,7 @@ def test_dandiset_rest_create_with_identifier(api_client, admin_user):
assert dandiset.draft_version.metadata == {
**metadata,
'manifestLocation': [
(
f'https://api.dandiarchive.org/api/dandisets/{dandiset.identifier}'
f'/versions/draft/assets/'
)
f'https://api.dandiarchive.org/api/dandisets/{dandiset.identifier}/versions/draft/assets/' # noqa: E501
],
'name': name,
'identifier': f'DANDI:{identifier}',
Expand Down Expand Up @@ -423,10 +417,7 @@ def test_dandiset_rest_create_with_contributor(api_client, admin_user):
assert dandiset.draft_version.metadata == {
**metadata,
'manifestLocation': [
(
f'https://api.dandiarchive.org/api/dandisets/{dandiset.identifier}'
f'/versions/draft/assets/'
)
f'https://api.dandiarchive.org/api/dandisets/{dandiset.identifier}/versions/draft/assets/' # noqa: E501
],
'name': name,
'identifier': f'DANDI:{identifier}',
Expand Down
21 changes: 5 additions & 16 deletions dandiapi/api/tests/test_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ def test_draft_version_metadata_computed(draft_version: Version):
expected_metadata = {
**original_metadata,
'manifestLocation': [
(
f'https://api.dandiarchive.org/api/dandisets/{draft_version.dandiset.identifier}'
f'/versions/draft/assets/'
)
f'https://api.dandiarchive.org/api/dandisets/{draft_version.dandiset.identifier}/versions/draft/assets/' # noqa: E501
],
'name': draft_version.name,
'identifier': f'DANDI:{draft_version.dandiset.identifier}',
Expand Down Expand Up @@ -298,8 +295,7 @@ def test_version_publish_version(draft_version, asset):
'dateCreated': UTC_ISO_TIMESTAMP_RE,
'datePublished': UTC_ISO_TIMESTAMP_RE,
'manifestLocation': [
f'http://localhost:9000/test-dandiapi-dandisets/test-prefix/dandisets/'
f'{publish_version.dandiset.identifier}/{publish_version.version}/assets.yaml',
f'http://localhost:9000/test-dandiapi-dandisets/test-prefix/dandisets/{publish_version.dandiset.identifier}/{publish_version.version}/assets.yaml', # noqa: E501
],
'identifier': f'DANDI:{publish_version.dandiset.identifier}',
'version': publish_version.version,
Expand Down Expand Up @@ -452,10 +448,7 @@ def test_version_rest_update(api_client, user, draft_version):
saved_metadata = {
**new_metadata,
'manifestLocation': [
(
f'https://api.dandiarchive.org/api/dandisets/{draft_version.dandiset.identifier}'
f'/versions/draft/assets/'
)
f'https://api.dandiarchive.org/api/dandisets/{draft_version.dandiset.identifier}/versions/draft/assets/' # noqa: E501
],
'name': new_name,
'identifier': f'DANDI:{draft_version.dandiset.identifier}',
Expand Down Expand Up @@ -522,10 +515,7 @@ def test_version_rest_update_large(api_client, user, draft_version):
saved_metadata = {
**new_metadata,
'manifestLocation': [
(
f'https://api.dandiarchive.org/api/dandisets/{draft_version.dandiset.identifier}'
f'/versions/draft/assets/'
)
f'https://api.dandiarchive.org/api/dandisets/{draft_version.dandiset.identifier}/versions/draft/assets/' # noqa: E501
],
'name': new_name,
'identifier': f'DANDI:{draft_version.dandiset.identifier}',
Expand Down Expand Up @@ -666,8 +656,7 @@ def test_version_rest_publish(api_client, user: User, draft_version: Version, as
},
'datePublished': UTC_ISO_TIMESTAMP_RE,
'manifestLocation': [
f'http://localhost:9000/test-dandiapi-dandisets/test-prefix/dandisets/'
f'{draft_version.dandiset.identifier}/{published_version.version}/assets.yaml',
f'http://localhost:9000/test-dandiapi-dandisets/test-prefix/dandisets/{draft_version.dandiset.identifier}/{published_version.version}/assets.yaml', # noqa: E501
],
'identifier': f'DANDI:{draft_version.dandiset.identifier}',
'version': published_version.version,
Expand Down
9 changes: 8 additions & 1 deletion dandiapi/celery.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os

from celery import Celery
from celery import Celery, signals
import configurations.importer

os.environ['DJANGO_SETTINGS_MODULE'] = 'dandiapi.settings'
Expand All @@ -14,3 +14,10 @@

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@signals.import_modules.connect
def _register_scheduled_tasks(sender, **kwargs):
from dandiapi.api.scheduled_tasks import register_scheduled_tasks

register_scheduled_tasks(sender, **kwargs)
3 changes: 2 additions & 1 deletion docker-compose.override.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ services:
"--app", "dandiapi.celery",
"worker",
"--loglevel", "INFO",
"--without-heartbeat"
"--without-heartbeat",
"-B"
]
# Docker Compose does not set the TTY width, which causes Celery errors
tty: false
Expand Down

0 comments on commit 10cb3b9

Please sign in to comment.