From cbd1dc1e51220d677370982c40da1c80c5f6655b Mon Sep 17 00:00:00 2001 From: Mike VanDenburgh Date: Wed, 8 Jun 2022 14:39:37 -0400 Subject: [PATCH] Move publishing into a celery task --- dandiapi/api/tasks/__init__.py | 43 +++++++++++++++ dandiapi/api/views/version.py | 96 ++++++++-------------------------- 2 files changed, 66 insertions(+), 73 deletions(-) diff --git a/dandiapi/api/tasks/__init__.py b/dandiapi/api/tasks/__init__.py index 3ebb2a83e..3f3eda097 100644 --- a/dandiapi/api/tasks/__init__.py +++ b/dandiapi/api/tasks/__init__.py @@ -5,6 +5,7 @@ from django.db.transaction import atomic import jsonschema.exceptions +from dandiapi.api import doi from dandiapi.api.checksum import calculate_sha256_checksum from dandiapi.api.doi import delete_doi from dandiapi.api.manifests import ( @@ -187,3 +188,45 @@ def unembargo_dandiset(dandiset_id: int): def cancel_zarr_upload(zarr_id: str): zarr_archive: ZarrArchive = ZarrArchive.objects.select_for_update().get(zarr_id=zarr_id) zarr_archive.cancel_upload() + + +@shared_task +@atomic +def publish_task(version_id: int): + old_version: Version = Version.objects.get(id=version_id) + new_version: Version = old_version.publish_version + + new_version.doi = doi.create_doi(new_version) + new_version.save() + + # Bulk create the join table rows to optimize linking assets to new_version + AssetVersions = Version.assets.through # noqa: N806 + + # Add a new many-to-many association directly to any already published assets + already_published_assets = old_version.assets.filter(published=True) + AssetVersions.objects.bulk_create( + [ + AssetVersions(asset_id=asset['id'], version_id=new_version.id) + for asset in already_published_assets.values('id') + ] + ) + + # Publish any draft assets + draft_assets = old_version.assets.filter(published=False).all() + for draft_asset in draft_assets: + draft_asset.publish() + Asset.objects.bulk_update(draft_assets, ['metadata', 'published']) + + AssetVersions.objects.bulk_create( + [AssetVersions(asset_id=asset.id, version_id=new_version.id) for asset in draft_assets] + ) + + # Save again to recompute metadata, specifically assetsSummary + new_version.save() + + # Set the version of the draft to PUBLISHED so that it cannot be publishd again without + # being modified and revalidated + old_version.status = Version.Status.PUBLISHED + old_version.save() + + write_manifest_files.delay(new_version.id) diff --git a/dandiapi/api/views/version.py b/dandiapi/api/views/version.py index 66ee0cefd..9278d498c 100644 --- a/dandiapi/api/views/version.py +++ b/dandiapi/api/views/version.py @@ -1,5 +1,6 @@ -from django.db import DatabaseError, transaction -from django.db.models import QuerySet +from __future__ import annotations + +from django.db import transaction from django.utils.decorators import method_decorator from drf_yasg.utils import no_body, swagger_auto_schema from guardian.decorators import permission_required_or_403 @@ -11,9 +12,8 @@ from rest_framework.viewsets import ReadOnlyModelViewSet from rest_framework_extensions.mixins import DetailSerializerMixin, NestedViewSetMixin -from dandiapi.api import doi from dandiapi.api.models import Dandiset, Version -from dandiapi.api.tasks import delete_doi_task, write_manifest_files +from dandiapi.api.tasks import delete_doi_task, publish_task from dandiapi.api.views.common import DANDISET_PK_PARAM, VERSION_PARAM, DandiPagination from dandiapi.api.views.serializers import ( VersionDetailSerializer, @@ -103,91 +103,41 @@ def update(self, request, **kwargs): responses={200: VersionSerializer()}, ) @action(detail=True, methods=['POST']) - @transaction.atomic + @transaction.atomic # needed for `select_for_update` @method_decorator(permission_required_or_403('owner', (Dandiset, 'pk', 'dandiset__pk'))) def publish(self, request, **kwargs): """Publish a version.""" - # Note: instead of using self.get_object() here, retrieve the Version/perform permission - # checks manually. This is so we can use select_for_update to lock rows to prevent - # double publishing - queryset: QuerySet[Version] = self.get_queryset() - try: - old_version: Version = get_object_or_404( - # `nowait` is used here so that a `DatabaseError` is thrown if this Version is - # already locked. The default behavior of `select_for_update` is to block until - # the lock can be acquired, but in this case we don't need to do that and incur - # the performance hit that comes with it. - queryset.select_for_update(nowait=True), - **kwargs, - ) - except DatabaseError: - # If this Version is already locked, return a 400 error - return Response( - 'Dandiset is already in the process of being published', status.HTTP_423_LOCKED - ) + version: Version | None = self.get_queryset().select_for_update().filter(**kwargs).first() - # Check permissions. get_object() normally calls this, but since we're not - # using it we must call this explicitly. - self.check_object_permissions(self.request, old_version) + if version is None: + return Response('Version not found', status=status.HTTP_404_NOT_FOUND) - if old_version.version != 'draft': + if version.version != 'draft': return Response( 'Only draft versions can be published', status=status.HTTP_405_METHOD_NOT_ALLOWED, ) if ( - old_version.dandiset.zarr_archives.exists() - or old_version.dandiset.embargoed_zarr_archives.exists() + version.dandiset.zarr_archives.exists() + or version.dandiset.embargoed_zarr_archives.exists() ): raise ValidationError('Cannot publish dandisets which contain zarrs') - if not old_version.valid: - return Response( - 'Dandiset metadata or asset metadata is not valid', - status=status.HTTP_400_BAD_REQUEST, + if not version.valid: + resp_text = ( + 'No changes since last publish' + if version.status == Version.Status.PUBLISHED + else 'Dandiset is currently being published' + if version.status == Version.Status.PUBLISHING + else 'Dandiset metadata or asset metadata is not valid' ) + return Response(resp_text, status=status.HTTP_400_BAD_REQUEST) - new_version = old_version.publish_version - - new_version.doi = doi.create_doi(new_version) - - new_version.save() - # Bulk create the join table rows to optimize linking assets to new_version - AssetVersions = Version.assets.through # noqa: N806 + version.status = Version.Status.PUBLISHING + version.save(update_fields=['status']) - # Add a new many-to-many association directly to any already published assets - already_published_assets = old_version.assets.filter(published=True) - AssetVersions.objects.bulk_create( - [ - AssetVersions(asset_id=asset['id'], version_id=new_version.id) - for asset in already_published_assets.values('id') - ] - ) + publish_task.delay(version.id) - # Publish any draft assets - # Import here to avoid dependency cycle - from dandiapi.api.models import Asset - - draft_assets = old_version.assets.filter(published=False).all() - for draft_asset in draft_assets: - draft_asset.publish() - Asset.objects.bulk_update(draft_assets, ['metadata', 'published']) - - AssetVersions.objects.bulk_create( - [AssetVersions(asset_id=asset.id, version_id=new_version.id) for asset in draft_assets] - ) - - # Save again to recompute metadata, specifically assetsSummary - new_version.save() - - # Set the version of the draft to PUBLISHED so that it cannot be publishd again without - # being modified and revalidated - old_version.status = Version.Status.PUBLISHED - old_version.save() - - write_manifest_files.delay(new_version.id) - - serializer = VersionSerializer(new_version) - return Response(serializer.data, status=status.HTTP_200_OK) + return Response(None, status=status.HTTP_200_OK) @swagger_auto_schema( manual_parameters=[DANDISET_PK_PARAM, VERSION_PARAM],