Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into audit-backend
Browse files Browse the repository at this point in the history
  • Loading branch information
waxlamp committed Jul 24, 2024
2 parents 50d80e6 + 2d53a2f commit 90c8bc8
Show file tree
Hide file tree
Showing 23 changed files with 419 additions and 155 deletions.
24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,27 @@
# v0.3.91 (Wed Jul 24 2024)

#### 🐛 Bug Fix

- Fix N query problem with VersionStatusFilter [#1986](https://github.com/dandi/dandi-archive/pull/1986) ([@jjnesbitt](https://github.com/jjnesbitt))

#### Authors: 1

- Jacob Nesbitt ([@jjnesbitt](https://github.com/jjnesbitt))

---

# v0.3.90 (Mon Jul 22 2024)

#### 🐛 Bug Fix

- Automate dandiset unembargo [#1965](https://github.com/dandi/dandi-archive/pull/1965) ([@jjnesbitt](https://github.com/jjnesbitt))

#### Authors: 1

- Jacob Nesbitt ([@jjnesbitt](https://github.com/jjnesbitt))

---

# v0.3.89 (Fri Jul 19 2024)

#### 🐛 Bug Fix
Expand Down
17 changes: 11 additions & 6 deletions dandiapi/api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,17 @@ class VersionStatusFilter(admin.SimpleListFilter):
title = 'status'
parameter_name = 'status'

def lookups(self, request, model_admin):
qs = model_admin.get_queryset(request)
for status in qs.values_list('status', flat=True).distinct():
count = qs.filter(status=status).count()
if count:
yield (status, f'{status} ({count})')
def lookups(self, *args, **kwargs):
# The queryset for VersionAdmin contains unnecessary data,
# so just use base queryset from Version.objects
qs = (
Version.objects.values_list('status')
.distinct()
.annotate(total=Count('status'))
.order_by()
)
for status, count in qs:
yield (status, f'{status} ({count})')

def queryset(self, request, queryset):
status = self.value()
Expand Down
66 changes: 39 additions & 27 deletions dandiapi/api/mail.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,18 @@ def user_greeting_name(user: User, socialaccount: SocialAccount = None) -> str:
return social_user['username']


def build_message(subject: str, message: str, to: list[str], html_message: str | None = None):
email_message = mail.EmailMultiAlternatives(subject=subject, body=message, to=to)
def build_message( # noqa: PLR0913
to: list[str],
subject: str,
message: str,
html_message: str | None = None,
cc: list[str] | None = None,
bcc: list[str] | None = None,
reply_to: list[str] | None = None,
):
email_message = mail.EmailMultiAlternatives(
subject=subject, body=message, to=to, cc=cc, bcc=bcc, reply_to=reply_to
)
if html_message is not None:
email_message.attach_alternative(html_message, 'text/html')
return email_message
Expand Down Expand Up @@ -182,31 +192,6 @@ def send_pending_users_message(users: Iterable[User]):
connection.send_messages(messages)


def build_dandisets_to_unembargo_message(dandisets: Iterable[Dandiset]):
dandiset_context = [
{
'identifier': ds.identifier,
'owners': [user.username for user in ds.owners],
'asset_count': ds.draft_version.asset_count,
'size': ds.draft_version.size,
}
for ds in dandisets
]
render_context = {**BASE_RENDER_CONTEXT, 'dandisets': dandiset_context}
return build_message(
subject='DANDI: New Dandisets to unembargo',
message=render_to_string('api/mail/dandisets_to_unembargo.txt', render_context),
to=[settings.DANDI_DEV_EMAIL],
)


def send_dandisets_to_unembargo_message(dandisets: Iterable[Dandiset]):
logger.info('Sending dandisets to unembargo message to devs at %s', settings.DANDI_DEV_EMAIL)
messages = [build_dandisets_to_unembargo_message(dandisets)]
with mail.get_connection() as connection:
connection.send_messages(messages)


def build_dandiset_unembargoed_message(dandiset: Dandiset):
dandiset_context = {
'identifier': dandiset.identifier,
Expand All @@ -231,3 +216,30 @@ def send_dandiset_unembargoed_message(dandiset: Dandiset):
messages = [build_dandiset_unembargoed_message(dandiset)]
with mail.get_connection() as connection:
connection.send_messages(messages)


def build_dandiset_unembargo_failed_message(dandiset: Dandiset):
dandiset_context = {
'identifier': dandiset.identifier,
}

render_context = {**BASE_RENDER_CONTEXT, 'dandiset': dandiset_context}
html_message = render_to_string('api/mail/dandiset_unembargo_failed.html', render_context)
return build_message(
subject=f'DANDI: Unembargo failed for dandiset {dandiset.identifier}',
message=strip_tags(html_message),
html_message=html_message,
to=[owner.email for owner in dandiset.owners],
bcc=[settings.DANDI_DEV_EMAIL],
reply_to=[ADMIN_EMAIL],
)


def send_dandiset_unembargo_failed_message(dandiset: Dandiset):
logger.info(
'Sending dandiset unembargo failed message for dandiset %s to dandiset owners and devs',
dandiset.identifier,
)
messages = [build_dandiset_unembargo_failed_message(dandiset)]
with mail.get_connection() as connection:
connection.send_messages(messages)
4 changes: 4 additions & 0 deletions dandiapi/api/models/dandiset.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ def identifier(self) -> str:
def embargoed(self) -> bool:
return self.embargo_status == self.EmbargoStatus.EMBARGOED

@property
def unembargo_in_progress(self) -> bool:
return self.embargo_status == self.EmbargoStatus.UNEMBARGOING

@property
def most_recent_published_version(self):
return self.versions.exclude(version='draft').order_by('modified').last()
Expand Down
2 changes: 1 addition & 1 deletion dandiapi/api/services/dandiset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def delete_dandiset(*, user, dandiset: Dandiset) -> None:
raise NotAllowedError('Cannot delete dandisets with published versions.')
if dandiset.versions.filter(status=Version.Status.PUBLISHING).exists():
raise NotAllowedError('Cannot delete dandisets that are currently being published.')
if dandiset.embargo_status == Dandiset.EmbargoStatus.UNEMBARGOING:
if dandiset.unembargo_in_progress:
raise DandisetUnembargoInProgressError

# Delete all versions first, so that AssetPath deletion is cascaded
Expand Down
103 changes: 73 additions & 30 deletions dandiapi/api/services/embargo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,37 @@
from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor
import logging
from typing import TYPE_CHECKING

from botocore.config import Config
from django.conf import settings
from django.db import transaction
from more_itertools import chunked

from dandiapi.api.copy import copy_object_multipart
from dandiapi.api.mail import send_dandisets_to_unembargo_message
from dandiapi.api.models import Asset, AssetBlob, AuditRecord, Dandiset, Upload, Version
from dandiapi.api.mail import send_dandiset_unembargoed_message
from dandiapi.api.models import AssetBlob, AuditRecord, Dandiset, Version
from dandiapi.api.services.asset.exceptions import DandisetOwnerRequiredError
from dandiapi.api.services.exceptions import DandiError
from dandiapi.api.storage import get_boto_client
from dandiapi.api.tasks import unembargo_dandiset_task

from .exceptions import (
AssetBlobEmbargoedError,
AssetTagRemovalError,
DandisetActiveUploadsError,
DandisetNotEmbargoedError,
)

if TYPE_CHECKING:
from django.contrib.auth.models import User
from django.db.models import QuerySet
from mypy_boto3_s3 import S3Client


logger = logging.getLogger(__name__)
ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE = 5000


def _delete_asset_blob_tags(client: S3Client, blob: str):
client.delete_object_tagging(
Bucket=settings.DANDI_DANDISETS_BUCKET_NAME,
Expand All @@ -34,30 +41,68 @@ def _delete_asset_blob_tags(client: S3Client, blob: str):

# NOTE: In testing this took ~2 minutes for 100,000 files
def _remove_dandiset_asset_blob_embargo_tags(dandiset: Dandiset):
# First we need to generate a CSV manifest containing all asset blobs that need to be untaged
embargoed_asset_blobs = AssetBlob.objects.filter(
embargoed=True, assets__versions__dandiset=dandiset
).values_list('blob', flat=True)

client = get_boto_client(config=Config(max_pool_connections=100))
with ThreadPoolExecutor(max_workers=100) as e:
for blob in embargoed_asset_blobs:
e.submit(_delete_asset_blob_tags, client=client, blob=blob)
embargoed_asset_blobs = (
AssetBlob.objects.filter(embargoed=True, assets__versions__dandiset=dandiset)
.values_list('blob', flat=True)
.iterator(chunk_size=ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE)
)

# Chunk the blobs so we're never storing a list of all embargoed blobs
chunks = chunked(embargoed_asset_blobs, ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE)
for chunk in chunks:
with ThreadPoolExecutor(max_workers=100) as e:
futures = [
e.submit(_delete_asset_blob_tags, client=client, blob=blob) for blob in chunk
]

# Check if any failed and raise exception if so
failed = [blob for i, blob in enumerate(chunk) if futures[i].exception() is not None]
if failed:
raise AssetTagRemovalError('Some blobs failed to remove tags', blobs=failed)


@transaction.atomic()
def _unembargo_dandiset(dandiset: Dandiset, user: User):
# NOTE: Before proceeding, all asset blobs must have their embargoed tags removed in s3
def unembargo_dandiset(ds: Dandiset, user: User):
"""Unembargo a dandiset by copying all embargoed asset blobs to the public bucket."""
logger.info('Unembargoing Dandiset %s', ds.identifier)
logger.info('\t%s assets', ds.draft_version.assets.count())

if ds.embargo_status != Dandiset.EmbargoStatus.UNEMBARGOING:
raise DandiError(
message=f'Expected dandiset state {Dandiset.EmbargoStatus.UNEMBARGOING}, found {ds.embargo_status}', # noqa: E501
http_status_code=500,
)
if ds.uploads.all().exists():
raise DandisetActiveUploadsError(http_status_code=500)

# Remove tags in S3
logger.info('Removing tags...')
_remove_dandiset_asset_blob_embargo_tags(ds)

# Update embargoed flag on asset blobs
updated = AssetBlob.objects.filter(embargoed=True, assets__versions__dandiset=ds).update(
embargoed=False
)
logger.info('Updated %s asset blobs', updated)

# Set status to OPEN
Dandiset.objects.filter(pk=ds.pk).update(embargo_status=Dandiset.EmbargoStatus.OPEN)
logger.info('Dandiset embargo status updated')

draft_version: Version = dandiset.draft_version
embargoed_assets: QuerySet[Asset] = draft_version.assets.filter(blob__embargoed=True)
AssetBlob.objects.filter(assets__in=embargoed_assets).update(embargoed=False)
# Fetch version to ensure changed embargo_status is included
# Save version to update metadata through populate_metadata
v = Version.objects.select_for_update().get(dandiset=ds, version='draft')
v.save()
logger.info('Version metadata updated')

# Set access on dandiset
dandiset.embargo_status = Dandiset.EmbargoStatus.OPEN
dandiset.save()
# Notify owners of completed unembargo
send_dandiset_unembargoed_message(ds)
logger.info('Dandiset owners notified.')

audit_record = AuditRecord.unembargo_dandiset(dandiset=dandiset, user=user)
logger.info('...Done')

audit_record = AuditRecord.unembargo_dandiset(dandiset=ds, user=user)
audit_record.save()


Expand All @@ -69,8 +114,8 @@ def remove_asset_blob_embargoed_tag(asset_blob: AssetBlob) -> None:
_delete_asset_blob_tags(client=get_boto_client(), blob=asset_blob.blob.name)


def unembargo_dandiset(*, user: User, dandiset: Dandiset):
"""Unembargo a dandiset by copying all embargoed asset blobs to the public bucket."""
def kickoff_dandiset_unembargo(*, user: User, dandiset: Dandiset):
"""Set dandiset status to kickoff unembargo."""
if dandiset.embargo_status != Dandiset.EmbargoStatus.EMBARGOED:
raise DandisetNotEmbargoedError

Expand All @@ -81,10 +126,8 @@ def unembargo_dandiset(*, user: User, dandiset: Dandiset):
if dandiset.uploads.count():
raise DandisetActiveUploadsError

# A scheduled task will pick up any new dandisets with this status and email the admins to
# initiate the unembargo process
dandiset.embargo_status = Dandiset.EmbargoStatus.UNEMBARGOING
dandiset.save()

# Send initial email to ensure it's seen in a timely manner
send_dandisets_to_unembargo_message(dandisets=[dandiset])
with transaction.atomic():
Dandiset.objects.filter(pk=dandiset.pk).update(
embargo_status=Dandiset.EmbargoStatus.UNEMBARGOING
)
transaction.on_commit(lambda: unembargo_dandiset_task.delay(dandiset.pk))
6 changes: 6 additions & 0 deletions dandiapi/api/services/embargo/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,9 @@ class UnauthorizedEmbargoAccessError(DandiError):
message = (
'Authentication credentials must be provided when attempting to access embargoed dandisets'
)


class AssetTagRemovalError(Exception):
def __init__(self, message: str, blobs: list[str]) -> None:
super().__init__(message)
self.blobs = blobs
26 changes: 17 additions & 9 deletions dandiapi/api/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from django.contrib.auth.models import User

from dandiapi.api.doi import delete_doi
from dandiapi.api.mail import send_dandiset_unembargo_failed_message
from dandiapi.api.manifests import (
write_assets_jsonld,
write_assets_yaml,
Expand All @@ -13,6 +14,7 @@
write_dandiset_yaml,
)
from dandiapi.api.models import Asset, AssetBlob, Version
from dandiapi.api.models.dandiset import Dandiset

logger = get_task_logger(__name__)

Expand Down Expand Up @@ -70,17 +72,23 @@ def delete_doi_task(doi: str) -> None:
delete_doi(doi)


@shared_task
def unembargo_dandiset_task(dandiset_id: int, user_id: int):
from dandiapi.api.services.embargo import _unembargo_dandiset

dandiset = Dandiset.objects.get(id=dandiset_id)
user = User.objects.get(id=user_id)
_unembargo_dandiset(dandiset, user)


@shared_task
def publish_dandiset_task(dandiset_id: int, user_id: int):
from dandiapi.api.services.publish import _publish_dandiset

_publish_dandiset(dandiset_id=dandiset_id, user_id=user_id)


@shared_task(soft_time_limit=1200)
def unembargo_dandiset_task(dandiset_id: int, user_id: int):
from dandiapi.api.services.embargo import unembargo_dandiset

ds = Dandiset.objects.get(pk=dandiset_id)
user = User.objects.get(id=user_id)

# If the unembargo fails for any reason, send an email, but continue the error propagation
try:
unembargo_dandiset(ds, user)
except Exception:
send_dandiset_unembargo_failed_message(ds)
raise
Loading

0 comments on commit 90c8bc8

Please sign in to comment.