Skip to content

Commit

Permalink
Merge pull request #1431 from dandi/asynchronous-asset-summaries
Browse files Browse the repository at this point in the history
  • Loading branch information
danlamanna committed Jan 30, 2023
2 parents 5560c74 + 3d95ec5 commit c45a09a
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 80 deletions.
81 changes: 14 additions & 67 deletions dandiapi/api/models/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

import datetime
import logging
from typing import TYPE_CHECKING

from dandischema.metadata import aggregate_assets_summary
from django.conf import settings
from django.contrib.postgres.indexes import HashIndex
from django.core.validators import RegexValidator
Expand All @@ -16,9 +14,6 @@

from .dandiset import Dandiset

if TYPE_CHECKING:
from .asset import Asset

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -145,32 +140,6 @@ def next_published_version(cls, dandiset: Dandiset) -> str:

return version

@property
def publish_version(self):
"""
Generate a published version + metadata without saving it.
This is useful to validate version metadata without saving it.
"""
# Create the published model
published_version = Version(
dandiset=self.dandiset,
name=self.name,
metadata=self.metadata,
status=Version.Status.VALID,
version=Version.next_published_version(self.dandiset),
)

now = datetime.datetime.now(datetime.timezone.utc)
# Recompute the metadata and inject the publishedBy and datePublished fields
published_version.metadata = {
**published_version._populate_metadata(version_with_assets=self),
'publishedBy': self.published_by(now),
'datePublished': now.isoformat(),
}

return published_version

@classmethod
def citation(cls, metadata):
year = datetime.datetime.now().year
Expand Down Expand Up @@ -215,58 +184,36 @@ def strip_metadata(cls, metadata):
]
return {key: metadata[key] for key in metadata if key not in computed_fields}

def _populate_metadata(self, version_with_assets: Version | None = None):

# When validating a draft version, we create a published version without saving it,
# calculate it's metadata, and validate that metadata. However, assetsSummary is computed
# based on the assets that belong to the dummy published version, which has not had assets
# copied to it yet. To get around this, version_with_assets is the draft version that
# should be used to look up the assets for the assetsSummary.
if version_with_assets is None:
version_with_assets = self

# When running _populate_metadata on an unsaved Version, self.assets is not available.
# Only compute the asset-based properties if this Version has an id, which means it's saved.
summary = {
'numberOfBytes': 0,
'numberOfFiles': 0,
}
if version_with_assets.id:
try:
assets: models.QuerySet[Asset] = version_with_assets.assets
summary = aggregate_assets_summary(
# There is no limit to how many assets a dandiset can have, so use
# `values_list` and `iterator` here to keep the memory footprint
# of this list low.
assets.values_list('metadata', flat=True).iterator()
)
except Exception:
# The assets summary aggregation may fail if any asset metadata is invalid.
# If so, just use the placeholder summary.
logger.info('Error calculating assetsSummary', exc_info=True)

# Import here to avoid dependency cycle
def _populate_metadata(self):
from dandiapi.api.manifests import manifest_location

metadata = {
**self.metadata,
'@context': (
'https://raw.githubusercontent.com/dandi/schema/master/releases/'
f'{self.metadata["schemaVersion"]}/context.json'
),
'manifestLocation': manifest_location(self),
'name': self.name,
'identifier': f'DANDI:{self.dandiset.identifier}',
'version': self.version,
'id': f'DANDI:{self.dandiset.identifier}/{self.version}',
'repository': settings.DANDI_WEB_APP_URL,
'url': f'{settings.DANDI_WEB_APP_URL}/dandiset/{self.dandiset.identifier}/{self.version}', # noqa
'assetsSummary': summary,
'dateCreated': self.dandiset.created.isoformat(),
}

if 'assetsSummary' not in metadata:
metadata['assetsSummary'] = {
'schemaKey': 'AssetsSummary',
'numberOfBytes': 0,
'numberOfFiles': 0,
}

if self.doi:
metadata['doi'] = self.doi
metadata['citation'] = self.citation(metadata)
metadata['@context'] = (
'https://raw.githubusercontent.com/dandi/schema/master/releases/'
f'{metadata["schemaVersion"]}/context.json'
)

return metadata

def save(self, *args, **kwargs):
Expand Down
32 changes: 24 additions & 8 deletions dandiapi/api/services/metadata/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from celery.utils.log import get_task_logger
import dandischema.exceptions
from dandischema.metadata import validate
from dandischema.metadata import aggregate_assets_summary, validate
from django.db import transaction
from django.utils import timezone
import jsonschema.exceptions
Expand Down Expand Up @@ -67,6 +67,21 @@ def validate_asset_metadata(*, asset: Asset) -> None:
asset.versions.filter(version='draft').update(modified=timezone.now())


def version_aggregate_assets_summary(version: Version):
if version.version != 'draft':
raise VersionHasBeenPublished()

version.metadata['assetsSummary'] = aggregate_assets_summary(
version.assets.values_list('metadata', flat=True).iterator()
)

Version.objects.filter(id=version.id, version='draft').update(
modified=timezone.now(), metadata=version.metadata
)
version.refresh_from_db()
return version


def validate_version_metadata(*, version: Version) -> None:
logger.info('Validating dandiset metadata for version %s', version.id)

Expand All @@ -75,17 +90,18 @@ def validate_version_metadata(*, version: Version) -> None:
raise VersionHasBeenPublished()

with transaction.atomic():
# validating version metadata needs to lock the version to avoid racing with
# other modifications e.g. aggregate_assets_summary.
version = (
Version.objects.filter(id=version.id, status=Version.Status.PENDING)
.select_for_update()
.first()
)
version.status = Version.Status.VALIDATING
version.save()

try:
publish_version = version.publish_version
metadata = publish_version.metadata

# Inject a dummy DOI so the metadata is valid
metadata['doi'] = '10.80507/dandi.123456/0.123456.1234'

validate(metadata, schema_key='PublishedDandiset', json_validation=True)
validate(version.metadata, schema_key='Dandiset', json_validation=True)
except dandischema.exceptions.ValidationError as e:
logger.info('Error while validating version %s', version.id)
version.status = Version.Status.INVALID
Expand Down
35 changes: 33 additions & 2 deletions dandiapi/api/services/publish/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import datetime

from dandischema.metadata import aggregate_assets_summary, validate
from django.contrib.auth.models import User
from django.db import transaction
from django.db.models import QuerySet
Expand Down Expand Up @@ -76,6 +79,27 @@ def _lock_dandiset_for_publishing(*, user: User, dandiset: Dandiset) -> None:
draft_version.save()


def _build_publishable_version_from_draft(draft_version: Version) -> Version:
publishable_version = Version(
dandiset=draft_version.dandiset,
name=draft_version.name,
metadata=draft_version.metadata,
status=Version.Status.VALID,
version=Version.next_published_version(draft_version.dandiset),
)

now = datetime.datetime.now(datetime.timezone.utc)
# inject the publishedBy and datePublished fields
publishable_version.metadata.update(
{
'publishedBy': draft_version.published_by(now),
'datePublished': now.isoformat(),
}
)

return publishable_version


def _publish_dandiset(dandiset_id: int) -> None:
"""
Publish a dandiset.
Expand All @@ -93,7 +117,7 @@ def _publish_dandiset(dandiset_id: int) -> None:
'before this function.'
)

new_version: Version = old_version.publish_version
new_version: Version = _build_publishable_version_from_draft(old_version)
new_version.save()

# Bulk create the join table rows to optimize linking assets to new_version
Expand Down Expand Up @@ -130,7 +154,9 @@ def _publish_dandiset(dandiset_id: int) -> None:
for draft_asset in draft_assets.iterator():
publish_asset(asset=draft_asset)

# Save again to recompute metadata, specifically assetsSummary
new_version.metadata['assetsSummary'] = aggregate_assets_summary(
new_version.assets.values_list('metadata', flat=True).iterator()
)
new_version.save()

# Add asset paths with new version
Expand All @@ -141,6 +167,11 @@ def _publish_dandiset(dandiset_id: int) -> None:
old_version.status = Version.Status.PUBLISHED
old_version.save()

# Inject a dummy DOI so the metadata is valid
new_version.metadata['doi'] = '10.80507/dandi.123456/0.123456.1234'

validate(new_version.metadata, schema_key='PublishedDandiset', json_validation=True)

# Write updated manifest files and create DOI after
# published version has been committed to DB.
transaction.on_commit(lambda: write_manifest_files.delay(new_version.id))
Expand Down
10 changes: 8 additions & 2 deletions dandiapi/api/tasks/scheduled.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,22 @@
from celery.utils.log import get_task_logger
from django.conf import settings
from django.contrib.auth.models import User
from django.db.transaction import atomic

from dandiapi.api.mail import send_pending_users_message
from dandiapi.api.models import UserMetadata, Version
from dandiapi.api.services.metadata import version_aggregate_assets_summary
from dandiapi.api.tasks import validate_version_metadata_task, write_manifest_files

logger = get_task_logger(__name__)


@shared_task(soft_time_limit=10)
def aggregate_assets_summary_task(version_id: int):
version = Version.objects.get(id=version_id)
version_aggregate_assets_summary(version)


@shared_task(soft_time_limit=20)
@atomic
def validate_draft_version_metadata():
# Select only the id of draft versions that have status PENDING
pending_draft_versions = (
Expand All @@ -34,6 +39,7 @@ def validate_draft_version_metadata():
logger.info('Found %s versions to validate', pending_draft_versions_count)
for draft_version_id in pending_draft_versions.iterator():
validate_version_metadata_task.delay(draft_version_id)
aggregate_assets_summary_task.delay(draft_version_id)

# Revalidation should be triggered every time a version is modified,
# so now is a good time to write out the manifests as well.
Expand Down
4 changes: 4 additions & 0 deletions dandiapi/api/tests/test_dandiset.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ def test_dandiset_rest_create(api_client, user):
}
],
'assetsSummary': {
'schemaKey': 'AssetsSummary',
'numberOfBytes': 0,
'numberOfFiles': 0,
},
Expand Down Expand Up @@ -480,6 +481,7 @@ def test_dandiset_rest_create_with_identifier(api_client, admin_user):
}
],
'assetsSummary': {
'schemaKey': 'AssetsSummary',
'numberOfBytes': 0,
'numberOfFiles': 0,
},
Expand Down Expand Up @@ -585,6 +587,7 @@ def test_dandiset_rest_create_with_contributor(api_client, admin_user):
}
],
'assetsSummary': {
'schemaKey': 'AssetsSummary',
'numberOfBytes': 0,
'numberOfFiles': 0,
},
Expand Down Expand Up @@ -675,6 +678,7 @@ def test_dandiset_rest_create_embargoed(api_client, user):
}
],
'assetsSummary': {
'schemaKey': 'AssetsSummary',
'numberOfBytes': 0,
'numberOfFiles': 0,
},
Expand Down
4 changes: 3 additions & 1 deletion dandiapi/api/tests/test_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from dandiapi.api import tasks
from dandiapi.api.models import Asset, Version
from dandiapi.api.services.publish import _build_publishable_version_from_draft
from dandiapi.zarr.tasks import ingest_zarr_archive

from .fuzzy import TIMESTAMP_RE, URN_RE, UTC_ISO_TIMESTAMP_RE, VERSION_ID_RE
Expand Down Expand Up @@ -279,7 +280,7 @@ def test_version_publish_version(draft_version, asset):
draft_version.assets.add(asset)
draft_version.save()

publish_version = draft_version.publish_version
publish_version = _build_publishable_version_from_draft(draft_version)
publish_version.doi = fake_doi
publish_version.save()

Expand Down Expand Up @@ -320,6 +321,7 @@ def test_version_publish_version(draft_version, asset):
# The published_version cannot have a properly defined assetsSummary yet, since that would
# require having created rows the Asset-to-Version join table, which is a side affect.
'assetsSummary': {
'schemaKey': 'AssetsSummary',
'numberOfBytes': 0,
'numberOfFiles': 0,
},
Expand Down

0 comments on commit c45a09a

Please sign in to comment.