diff --git a/dandiapi/api/services/metadata/__init__.py b/dandiapi/api/services/metadata/__init__.py index 0656916a1..9a2467e1b 100644 --- a/dandiapi/api/services/metadata/__init__.py +++ b/dandiapi/api/services/metadata/__init__.py @@ -1,15 +1,20 @@ from __future__ import annotations +from pathlib import Path from typing import TYPE_CHECKING from celery.utils.log import get_task_logger +from dandi.dandiapi import RemoteReadableAsset +from dandi.metadata import nwb2asset import dandischema.exceptions from dandischema.metadata import aggregate_assets_summary, validate from django.conf import settings +from django.contrib.auth.models import User from django.db import transaction from django.utils import timezone from dandiapi.api.models import Asset, Version +from dandiapi.api.services.asset import change_asset from dandiapi.api.services.metadata.exceptions import ( AssetHasBeenPublishedError, VersionHasBeenPublishedError, @@ -180,3 +185,30 @@ def _get_version_validation_result( version_qs.update(status=Version.Status.VALIDATING) status, errors = _get_version_validation_result(current_version) version_qs.update(status=status, validation_errors=errors, modified=timezone.now()) + + +def re_extract_asset_metadata(asset: Asset): + readable_asset = RemoteReadableAsset( + asset.s3_url, size=asset.size, mtime=asset.modified, name=Path(asset.path).name + ) + + if not (asset.path.endswith('.nwb') or asset.path.endswith('.NWB')): + logger.info('Asset %s: Not an NWB file, skipping...', asset.path) + return + + new_metadata = nwb2asset(readable_asset).json_dict() + + # Use dandiset owner, default to some admin user + draft_version = asset.versions.get(version='draft') + user = draft_version.dandiset.owners.first() + if user is None: + user = User.objects.filter(is_superuser=True, is_staff=True).first() + + change_asset( + user=user, + asset=asset, + version=draft_version, + new_asset_blob=asset.blob, + new_zarr_archive=asset.zarr, + new_metadata=new_metadata, + ) diff --git a/dandiapi/api/tasks/__init__.py b/dandiapi/api/tasks/__init__.py index 2b4496c74..3e74d424c 100644 --- a/dandiapi/api/tasks/__init__.py +++ b/dandiapi/api/tasks/__init__.py @@ -5,6 +5,7 @@ from celery import shared_task from celery.exceptions import SoftTimeLimitExceeded from celery.utils.log import get_task_logger +from django.conf import settings from django.contrib.auth.models import User from dandiapi.api.doi import delete_doi @@ -21,6 +22,7 @@ if TYPE_CHECKING: from uuid import UUID +from dandiapi.api.services.metadata import re_extract_asset_metadata logger = get_task_logger(__name__) @@ -104,3 +106,20 @@ def unembargo_dandiset_task(dandiset_id: int, user_id: int): except Exception: send_dandiset_unembargo_failed_message(ds) raise + + +@shared_task +def dispatch_assets_to_re_extract(): + assets_to_migrate = Asset.objects.prefetch_related('versions').filter( + metadata__schemaVersion__lt=settings.DANDI_SCHEMA_VERSION, + versions__version='draft', + ) + + for asset in assets_to_migrate: + re_extract_asset_metadata_task.delay(asset_id=asset.id) + + +@shared_task +def re_extract_asset_metadata_task(asset_id: int): + asset = Asset.objects.get(id=asset_id) + re_extract_asset_metadata(asset=asset)