Skip to content

Commit

Permalink
Move publishing into a celery task
Browse files Browse the repository at this point in the history
  • Loading branch information
mvandenburgh committed Jun 8, 2022
1 parent 63c47f7 commit cbd1dc1
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 73 deletions.
43 changes: 43 additions & 0 deletions dandiapi/api/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from django.db.transaction import atomic
import jsonschema.exceptions

from dandiapi.api import doi
from dandiapi.api.checksum import calculate_sha256_checksum
from dandiapi.api.doi import delete_doi
from dandiapi.api.manifests import (
Expand Down Expand Up @@ -187,3 +188,45 @@ def unembargo_dandiset(dandiset_id: int):
def cancel_zarr_upload(zarr_id: str):
zarr_archive: ZarrArchive = ZarrArchive.objects.select_for_update().get(zarr_id=zarr_id)
zarr_archive.cancel_upload()


@shared_task
@atomic
def publish_task(version_id: int):
old_version: Version = Version.objects.get(id=version_id)
new_version: Version = old_version.publish_version

new_version.doi = doi.create_doi(new_version)
new_version.save()

# Bulk create the join table rows to optimize linking assets to new_version
AssetVersions = Version.assets.through # noqa: N806

# Add a new many-to-many association directly to any already published assets
already_published_assets = old_version.assets.filter(published=True)
AssetVersions.objects.bulk_create(
[
AssetVersions(asset_id=asset['id'], version_id=new_version.id)
for asset in already_published_assets.values('id')
]
)

# Publish any draft assets
draft_assets = old_version.assets.filter(published=False).all()
for draft_asset in draft_assets:
draft_asset.publish()
Asset.objects.bulk_update(draft_assets, ['metadata', 'published'])

AssetVersions.objects.bulk_create(
[AssetVersions(asset_id=asset.id, version_id=new_version.id) for asset in draft_assets]
)

# Save again to recompute metadata, specifically assetsSummary
new_version.save()

# Set the version of the draft to PUBLISHED so that it cannot be publishd again without
# being modified and revalidated
old_version.status = Version.Status.PUBLISHED
old_version.save()

write_manifest_files.delay(new_version.id)
96 changes: 23 additions & 73 deletions dandiapi/api/views/version.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from django.db import DatabaseError, transaction
from django.db.models import QuerySet
from __future__ import annotations

from django.db import transaction
from django.utils.decorators import method_decorator
from drf_yasg.utils import no_body, swagger_auto_schema
from guardian.decorators import permission_required_or_403
Expand All @@ -11,9 +12,8 @@
from rest_framework.viewsets import ReadOnlyModelViewSet
from rest_framework_extensions.mixins import DetailSerializerMixin, NestedViewSetMixin

from dandiapi.api import doi
from dandiapi.api.models import Dandiset, Version
from dandiapi.api.tasks import delete_doi_task, write_manifest_files
from dandiapi.api.tasks import delete_doi_task, publish_task
from dandiapi.api.views.common import DANDISET_PK_PARAM, VERSION_PARAM, DandiPagination
from dandiapi.api.views.serializers import (
VersionDetailSerializer,
Expand Down Expand Up @@ -103,91 +103,41 @@ def update(self, request, **kwargs):
responses={200: VersionSerializer()},
)
@action(detail=True, methods=['POST'])
@transaction.atomic
@transaction.atomic # needed for `select_for_update`
@method_decorator(permission_required_or_403('owner', (Dandiset, 'pk', 'dandiset__pk')))
def publish(self, request, **kwargs):
"""Publish a version."""
# Note: instead of using self.get_object() here, retrieve the Version/perform permission
# checks manually. This is so we can use select_for_update to lock rows to prevent
# double publishing
queryset: QuerySet[Version] = self.get_queryset()
try:
old_version: Version = get_object_or_404(
# `nowait` is used here so that a `DatabaseError` is thrown if this Version is
# already locked. The default behavior of `select_for_update` is to block until
# the lock can be acquired, but in this case we don't need to do that and incur
# the performance hit that comes with it.
queryset.select_for_update(nowait=True),
**kwargs,
)
except DatabaseError:
# If this Version is already locked, return a 400 error
return Response(
'Dandiset is already in the process of being published', status.HTTP_423_LOCKED
)
version: Version | None = self.get_queryset().select_for_update().filter(**kwargs).first()

# Check permissions. get_object() normally calls this, but since we're not
# using it we must call this explicitly.
self.check_object_permissions(self.request, old_version)
if version is None:
return Response('Version not found', status=status.HTTP_404_NOT_FOUND)

if old_version.version != 'draft':
if version.version != 'draft':
return Response(
'Only draft versions can be published',
status=status.HTTP_405_METHOD_NOT_ALLOWED,
)
if (
old_version.dandiset.zarr_archives.exists()
or old_version.dandiset.embargoed_zarr_archives.exists()
version.dandiset.zarr_archives.exists()
or version.dandiset.embargoed_zarr_archives.exists()
):
raise ValidationError('Cannot publish dandisets which contain zarrs')
if not old_version.valid:
return Response(
'Dandiset metadata or asset metadata is not valid',
status=status.HTTP_400_BAD_REQUEST,
if not version.valid:
resp_text = (
'No changes since last publish'
if version.status == Version.Status.PUBLISHED
else 'Dandiset is currently being published'
if version.status == Version.Status.PUBLISHING
else 'Dandiset metadata or asset metadata is not valid'
)
return Response(resp_text, status=status.HTTP_400_BAD_REQUEST)

new_version = old_version.publish_version

new_version.doi = doi.create_doi(new_version)

new_version.save()
# Bulk create the join table rows to optimize linking assets to new_version
AssetVersions = Version.assets.through # noqa: N806
version.status = Version.Status.PUBLISHING
version.save(update_fields=['status'])

# Add a new many-to-many association directly to any already published assets
already_published_assets = old_version.assets.filter(published=True)
AssetVersions.objects.bulk_create(
[
AssetVersions(asset_id=asset['id'], version_id=new_version.id)
for asset in already_published_assets.values('id')
]
)
publish_task.delay(version.id)

# Publish any draft assets
# Import here to avoid dependency cycle
from dandiapi.api.models import Asset

draft_assets = old_version.assets.filter(published=False).all()
for draft_asset in draft_assets:
draft_asset.publish()
Asset.objects.bulk_update(draft_assets, ['metadata', 'published'])

AssetVersions.objects.bulk_create(
[AssetVersions(asset_id=asset.id, version_id=new_version.id) for asset in draft_assets]
)

# Save again to recompute metadata, specifically assetsSummary
new_version.save()

# Set the version of the draft to PUBLISHED so that it cannot be publishd again without
# being modified and revalidated
old_version.status = Version.Status.PUBLISHED
old_version.save()

write_manifest_files.delay(new_version.id)

serializer = VersionSerializer(new_version)
return Response(serializer.data, status=status.HTTP_200_OK)
return Response(None, status=status.HTTP_200_OK)

@swagger_auto_schema(
manual_parameters=[DANDISET_PK_PARAM, VERSION_PARAM],
Expand Down

0 comments on commit cbd1dc1

Please sign in to comment.