Skip to content

Commit

Permalink
Merge pull request #1965 from dandi/automated-unembargo
Browse files Browse the repository at this point in the history
Automate dandiset unembargo
  • Loading branch information
jjnesbitt authored Jul 22, 2024
2 parents ed194fc + bcbb10b commit 984192c
Show file tree
Hide file tree
Showing 21 changed files with 382 additions and 138 deletions.
66 changes: 39 additions & 27 deletions dandiapi/api/mail.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,18 @@ def user_greeting_name(user: User, socialaccount: SocialAccount = None) -> str:
return social_user['username']


def build_message(subject: str, message: str, to: list[str], html_message: str | None = None):
email_message = mail.EmailMultiAlternatives(subject=subject, body=message, to=to)
def build_message( # noqa: PLR0913
to: list[str],
subject: str,
message: str,
html_message: str | None = None,
cc: list[str] | None = None,
bcc: list[str] | None = None,
reply_to: list[str] | None = None,
):
email_message = mail.EmailMultiAlternatives(
subject=subject, body=message, to=to, cc=cc, bcc=bcc, reply_to=reply_to
)
if html_message is not None:
email_message.attach_alternative(html_message, 'text/html')
return email_message
Expand Down Expand Up @@ -182,31 +192,6 @@ def send_pending_users_message(users: Iterable[User]):
connection.send_messages(messages)


def build_dandisets_to_unembargo_message(dandisets: Iterable[Dandiset]):
dandiset_context = [
{
'identifier': ds.identifier,
'owners': [user.username for user in ds.owners],
'asset_count': ds.draft_version.asset_count,
'size': ds.draft_version.size,
}
for ds in dandisets
]
render_context = {**BASE_RENDER_CONTEXT, 'dandisets': dandiset_context}
return build_message(
subject='DANDI: New Dandisets to unembargo',
message=render_to_string('api/mail/dandisets_to_unembargo.txt', render_context),
to=[settings.DANDI_DEV_EMAIL],
)


def send_dandisets_to_unembargo_message(dandisets: Iterable[Dandiset]):
logger.info('Sending dandisets to unembargo message to devs at %s', settings.DANDI_DEV_EMAIL)
messages = [build_dandisets_to_unembargo_message(dandisets)]
with mail.get_connection() as connection:
connection.send_messages(messages)


def build_dandiset_unembargoed_message(dandiset: Dandiset):
dandiset_context = {
'identifier': dandiset.identifier,
Expand All @@ -231,3 +216,30 @@ def send_dandiset_unembargoed_message(dandiset: Dandiset):
messages = [build_dandiset_unembargoed_message(dandiset)]
with mail.get_connection() as connection:
connection.send_messages(messages)


def build_dandiset_unembargo_failed_message(dandiset: Dandiset):
dandiset_context = {
'identifier': dandiset.identifier,
}

render_context = {**BASE_RENDER_CONTEXT, 'dandiset': dandiset_context}
html_message = render_to_string('api/mail/dandiset_unembargo_failed.html', render_context)
return build_message(
subject=f'DANDI: Unembargo failed for dandiset {dandiset.identifier}',
message=strip_tags(html_message),
html_message=html_message,
to=[owner.email for owner in dandiset.owners],
bcc=[settings.DANDI_DEV_EMAIL],
reply_to=[ADMIN_EMAIL],
)


def send_dandiset_unembargo_failed_message(dandiset: Dandiset):
logger.info(
'Sending dandiset unembargo failed message for dandiset %s to dandiset owners and devs',
dandiset.identifier,
)
messages = [build_dandiset_unembargo_failed_message(dandiset)]
with mail.get_connection() as connection:
connection.send_messages(messages)
4 changes: 4 additions & 0 deletions dandiapi/api/models/dandiset.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ def identifier(self) -> str:
def embargoed(self) -> bool:
return self.embargo_status == self.EmbargoStatus.EMBARGOED

@property
def unembargo_in_progress(self) -> bool:
return self.embargo_status == self.EmbargoStatus.UNEMBARGOING

@property
def most_recent_published_version(self):
return self.versions.exclude(version='draft').order_by('modified').last()
Expand Down
2 changes: 1 addition & 1 deletion dandiapi/api/services/dandiset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def delete_dandiset(*, user, dandiset: Dandiset) -> None:
raise NotAllowedError('Cannot delete dandisets with published versions.')
if dandiset.versions.filter(status=Version.Status.PUBLISHING).exists():
raise NotAllowedError('Cannot delete dandisets that are currently being published.')
if dandiset.embargo_status == Dandiset.EmbargoStatus.UNEMBARGOING:
if dandiset.unembargo_in_progress:
raise DandisetUnembargoInProgressError

# Delete all versions first, so that AssetPath deletion is cascaded
Expand Down
100 changes: 72 additions & 28 deletions dandiapi/api/services/embargo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,37 @@
from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor
import logging
from typing import TYPE_CHECKING

from botocore.config import Config
from django.conf import settings
from django.db import transaction
from more_itertools import chunked

from dandiapi.api.mail import send_dandisets_to_unembargo_message
from dandiapi.api.models import Asset, AssetBlob, Dandiset, Version
from dandiapi.api.mail import send_dandiset_unembargoed_message
from dandiapi.api.models import AssetBlob, Dandiset, Version
from dandiapi.api.services.asset.exceptions import DandisetOwnerRequiredError
from dandiapi.api.services.exceptions import DandiError
from dandiapi.api.storage import get_boto_client
from dandiapi.api.tasks import unembargo_dandiset_task

from .exceptions import (
AssetBlobEmbargoedError,
AssetTagRemovalError,
DandisetActiveUploadsError,
DandisetNotEmbargoedError,
)

if TYPE_CHECKING:
from django.contrib.auth.models import User
from django.db.models import QuerySet
from mypy_boto3_s3 import S3Client


logger = logging.getLogger(__name__)
ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE = 5000


def _delete_asset_blob_tags(client: S3Client, blob: str):
client.delete_object_tagging(
Bucket=settings.DANDI_DANDISETS_BUCKET_NAME,
Expand All @@ -33,28 +41,66 @@ def _delete_asset_blob_tags(client: S3Client, blob: str):

# NOTE: In testing this took ~2 minutes for 100,000 files
def _remove_dandiset_asset_blob_embargo_tags(dandiset: Dandiset):
# First we need to generate a CSV manifest containing all asset blobs that need to be untaged
embargoed_asset_blobs = AssetBlob.objects.filter(
embargoed=True, assets__versions__dandiset=dandiset
).values_list('blob', flat=True)

client = get_boto_client(config=Config(max_pool_connections=100))
with ThreadPoolExecutor(max_workers=100) as e:
for blob in embargoed_asset_blobs:
e.submit(_delete_asset_blob_tags, client=client, blob=blob)
embargoed_asset_blobs = (
AssetBlob.objects.filter(embargoed=True, assets__versions__dandiset=dandiset)
.values_list('blob', flat=True)
.iterator(chunk_size=ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE)
)

# Chunk the blobs so we're never storing a list of all embargoed blobs
chunks = chunked(embargoed_asset_blobs, ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE)
for chunk in chunks:
with ThreadPoolExecutor(max_workers=100) as e:
futures = [
e.submit(_delete_asset_blob_tags, client=client, blob=blob) for blob in chunk
]

# Check if any failed and raise exception if so
failed = [blob for i, blob in enumerate(chunk) if futures[i].exception() is not None]
if failed:
raise AssetTagRemovalError('Some blobs failed to remove tags', blobs=failed)


@transaction.atomic()
def _unembargo_dandiset(dandiset: Dandiset):
# NOTE: Before proceeding, all asset blobs must have their embargoed tags removed in s3
def unembargo_dandiset(ds: Dandiset):
"""Unembargo a dandiset by copying all embargoed asset blobs to the public bucket."""
logger.info('Unembargoing Dandiset %s', ds.identifier)
logger.info('\t%s assets', ds.draft_version.assets.count())

if ds.embargo_status != Dandiset.EmbargoStatus.UNEMBARGOING:
raise DandiError(
message=f'Expected dandiset state {Dandiset.EmbargoStatus.UNEMBARGOING}, found {ds.embargo_status}', # noqa: E501
http_status_code=500,
)
if ds.uploads.all().exists():
raise DandisetActiveUploadsError(http_status_code=500)

# Remove tags in S3
logger.info('Removing tags...')
_remove_dandiset_asset_blob_embargo_tags(ds)

# Update embargoed flag on asset blobs
updated = AssetBlob.objects.filter(embargoed=True, assets__versions__dandiset=ds).update(
embargoed=False
)
logger.info('Updated %s asset blobs', updated)

# Set status to OPEN
Dandiset.objects.filter(pk=ds.pk).update(embargo_status=Dandiset.EmbargoStatus.OPEN)
logger.info('Dandiset embargo status updated')

draft_version: Version = dandiset.draft_version
embargoed_assets: QuerySet[Asset] = draft_version.assets.filter(blob__embargoed=True)
AssetBlob.objects.filter(assets__in=embargoed_assets).update(embargoed=False)
# Fetch version to ensure changed embargo_status is included
# Save version to update metadata through populate_metadata
v = Version.objects.select_for_update().get(dandiset=ds, version='draft')
v.save()
logger.info('Version metadata updated')

# Set access on dandiset
dandiset.embargo_status = Dandiset.EmbargoStatus.OPEN
dandiset.save()
# Notify owners of completed unembargo
send_dandiset_unembargoed_message(ds)
logger.info('Dandiset owners notified.')

logger.info('...Done')


def remove_asset_blob_embargoed_tag(asset_blob: AssetBlob) -> None:
Expand All @@ -65,8 +111,8 @@ def remove_asset_blob_embargoed_tag(asset_blob: AssetBlob) -> None:
_delete_asset_blob_tags(client=get_boto_client(), blob=asset_blob.blob.name)


def unembargo_dandiset(*, user: User, dandiset: Dandiset):
"""Unembargo a dandiset by copying all embargoed asset blobs to the public bucket."""
def kickoff_dandiset_unembargo(*, user: User, dandiset: Dandiset):
"""Set dandiset status to kickoff unembargo."""
if dandiset.embargo_status != Dandiset.EmbargoStatus.EMBARGOED:
raise DandisetNotEmbargoedError

Expand All @@ -76,10 +122,8 @@ def unembargo_dandiset(*, user: User, dandiset: Dandiset):
if dandiset.uploads.count():
raise DandisetActiveUploadsError

# A scheduled task will pick up any new dandisets with this status and email the admins to
# initiate the unembargo process
dandiset.embargo_status = Dandiset.EmbargoStatus.UNEMBARGOING
dandiset.save()

# Send initial email to ensure it's seen in a timely manner
send_dandisets_to_unembargo_message(dandisets=[dandiset])
with transaction.atomic():
Dandiset.objects.filter(pk=dandiset.pk).update(
embargo_status=Dandiset.EmbargoStatus.UNEMBARGOING
)
transaction.on_commit(lambda: unembargo_dandiset_task.delay(dandiset.pk))
6 changes: 6 additions & 0 deletions dandiapi/api/services/embargo/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,9 @@ class UnauthorizedEmbargoAccessError(DandiError):
message = (
'Authentication credentials must be provided when attempting to access embargoed dandisets'
)


class AssetTagRemovalError(Exception):
def __init__(self, message: str, blobs: list[str]) -> None:
super().__init__(message)
self.blobs = blobs
16 changes: 16 additions & 0 deletions dandiapi/api/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from celery.utils.log import get_task_logger

from dandiapi.api.doi import delete_doi
from dandiapi.api.mail import send_dandiset_unembargo_failed_message
from dandiapi.api.manifests import (
write_assets_jsonld,
write_assets_yaml,
Expand All @@ -12,6 +13,7 @@
write_dandiset_yaml,
)
from dandiapi.api.models import Asset, AssetBlob, Version
from dandiapi.api.models.dandiset import Dandiset

logger = get_task_logger(__name__)

Expand Down Expand Up @@ -74,3 +76,17 @@ def publish_dandiset_task(dandiset_id: int):
from dandiapi.api.services.publish import _publish_dandiset

_publish_dandiset(dandiset_id=dandiset_id)


@shared_task(soft_time_limit=1200)
def unembargo_dandiset_task(dandiset_id: int):
from dandiapi.api.services.embargo import unembargo_dandiset

ds = Dandiset.objects.get(pk=dandiset_id)

# If the unembargo fails for any reason, send an email, but continue the error propagation
try:
unembargo_dandiset(ds)
except Exception:
send_dandiset_unembargo_failed_message(ds)
raise
14 changes: 1 addition & 13 deletions dandiapi/api/tasks/scheduled.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@
from django.db.models.query_utils import Q

from dandiapi.analytics.tasks import collect_s3_log_records_task
from dandiapi.api.mail import send_dandisets_to_unembargo_message, send_pending_users_message
from dandiapi.api.mail import send_pending_users_message
from dandiapi.api.models import UserMetadata, Version
from dandiapi.api.models.asset import Asset
from dandiapi.api.models.dandiset import Dandiset
from dandiapi.api.services.metadata import version_aggregate_assets_summary
from dandiapi.api.services.metadata.exceptions import VersionMetadataConcurrentlyModifiedError
from dandiapi.api.tasks import (
Expand Down Expand Up @@ -112,14 +111,6 @@ def send_pending_users_email() -> None:
send_pending_users_message(pending_users)


@shared_task(soft_time_limit=20)
def send_dandisets_to_unembargo_email() -> None:
"""Send an email to admins listing dandisets that have requested unembargo."""
dandisets = Dandiset.objects.filter(embargo_status=Dandiset.EmbargoStatus.UNEMBARGOING)
if dandisets.exists():
send_dandisets_to_unembargo_message(dandisets)


@shared_task(soft_time_limit=60)
def refresh_materialized_view_search() -> None:
"""
Expand Down Expand Up @@ -148,9 +139,6 @@ def register_scheduled_tasks(sender: Celery, **kwargs):
# Send daily email to admins containing a list of users awaiting approval
sender.add_periodic_task(crontab(hour=0, minute=0), send_pending_users_email.s())

# Send daily email to admins containing a list of dandisets to unembargo
sender.add_periodic_task(crontab(hour=0, minute=0), send_dandisets_to_unembargo_email.s())

# Refresh the materialized view used by asset search every 10 mins.
sender.add_periodic_task(timedelta(minutes=10), refresh_materialized_view_search.s())

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
There was an error during the unembargo of dandiset
<a href="{{ dandiset.ui_link }}">{{dandiset.identifier }}</a>. This has been reported to the developers, and will be
investigated as soon as possible. We hope to have this resolved soon! Please avoid making any major changes to this
dandiset in the meantime, as that may hinder or delay the process of resolving this issue.
If in two business days your dandiset is still embargoed, please reply to this email.
<br><br><br>
<em>You are receiving this email because you are an owner of this dandiset.</em>
10 changes: 0 additions & 10 deletions dandiapi/api/templates/api/mail/dandisets_to_unembargo.txt

This file was deleted.

6 changes: 3 additions & 3 deletions dandiapi/api/tests/test_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ def test_asset_create_embargo(


@pytest.mark.django_db()
def test_asset_create_unembargoing(
def test_asset_create_unembargo_in_progress(
api_client, user, draft_version_factory, dandiset_factory, embargoed_asset_blob
):
dandiset = dandiset_factory(embargo_status=Dandiset.EmbargoStatus.UNEMBARGOING)
Expand Down Expand Up @@ -1122,7 +1122,7 @@ def test_asset_rest_update_embargo(api_client, user, draft_version, asset, embar


@pytest.mark.django_db()
def test_asset_rest_update_unembargoing(
def test_asset_rest_update_unembargo_in_progress(
api_client, user, draft_version_factory, asset, embargoed_asset_blob
):
draft_version = draft_version_factory(
Expand Down Expand Up @@ -1325,7 +1325,7 @@ def test_asset_rest_delete(api_client, user, draft_version, asset):


@pytest.mark.django_db()
def test_asset_rest_delete_unembargoing(api_client, user, draft_version_factory, asset):
def test_asset_rest_delete_unembargo_in_progress(api_client, user, draft_version_factory, asset):
draft_version = draft_version_factory(
dandiset__embargo_status=Dandiset.EmbargoStatus.UNEMBARGOING
)
Expand Down
Loading

0 comments on commit 984192c

Please sign in to comment.