diff --git a/dandiapi/api/management/commands/extract_metadata.py b/dandiapi/api/management/commands/extract_metadata.py new file mode 100644 index 000000000..ce91ad4a4 --- /dev/null +++ b/dandiapi/api/management/commands/extract_metadata.py @@ -0,0 +1,92 @@ +from __future__ import annotations + +import logging +from pathlib import Path + +from dandi.dandiapi import RemoteReadableAsset +from dandi.metadata.nwb import nwb2asset +from django.contrib.auth.models import User +import djclick as click +from tqdm import tqdm + +from dandiapi.api.models import Asset, Dandiset, Version +from dandiapi.api.services.asset import change_asset + +logger = logging.getLogger(__name__) + + +@click.group() +def group(): + pass + + +def extract_asset_metadata(asset: Asset, draft_version: Version): + readable_asset = RemoteReadableAsset( + asset.s3_url, size=asset.size, mtime=asset.modified, name=Path(asset.path).name + ) + + if not asset.path.lower().endswith('.nwb'): + logger.info('Asset %s: Not an NWB file, skipping...', asset.path) + return + + new_metadata = nwb2asset(readable_asset).json_dict() + + # Use dandiset owner, default to some admin user + user = ( + draft_version.dandiset.owners.first() + or User.objects.filter(is_superuser=True, is_staff=True).first() + ) + + # Replace old asset with new asset containing updated metadata + change_asset( + user=user, + asset=asset, + version=draft_version, + new_asset_blob=asset.blob, + new_zarr_archive=asset.zarr, + new_metadata=new_metadata, + ) + + +def extract_dandiset_assets(dandiset: Dandiset): + # Only update assets which do not belong to a published version + assets = dandiset.draft_version.assets.filter( + published=False, path__iendswith='.nwb' + ).select_related('blob', 'zarr') + if not assets: + logger.info('No draft NWB assets found in dandiset %s. Skipping...', dandiset) + return + + for asset in tqdm(assets): + extract_asset_metadata(asset=asset, draft_version=dandiset.draft_version) + + +@group.command(help='Re-extracts the metadata of this asset') +@click.argument('asset_id') +def asset(asset_id: str): + asset = Asset.objects.get(asset_id=asset_id) + draft_versions = asset.versions.filter(version='draft') + if not draft_versions.exists(): + raise click.ClickException( + 'Cannot re-extract metadata of asset that has no associated draft versions.' + ) + + # Re-extract for every draft version + for version in draft_versions: + extract_asset_metadata(asset=asset, draft_version=version) + + +@group.command( + help='Re-extracts the metadata of all assets in the draft version of the provided dandiset' +) +@click.argument('dandiset_id') +def dandiset(dandiset_id: str): + dandiset = Dandiset.objects.get(id=int(dandiset_id)) + extract_dandiset_assets(dandiset) + + +@group.command(name='all', help='Re-extracts the metadata of all assets in all draft versions') +def all_dandisets(): + for dandiset in Dandiset.objects.all(): + logger.info('DANDISET: %s', dandiset.identifier) + extract_dandiset_assets(dandiset) diff --git a/dandiapi/api/services/metadata/__init__.py b/dandiapi/api/services/metadata/__init__.py index 0656916a1..f1b500133 100644 --- a/dandiapi/api/services/metadata/__init__.py +++ b/dandiapi/api/services/metadata/__init__.py @@ -1,15 +1,20 @@ from __future__ import annotations +from pathlib import Path from typing import TYPE_CHECKING from celery.utils.log import get_task_logger +from dandi.dandiapi import RemoteReadableAsset +from dandi.metadata.nwb import nwb2asset import dandischema.exceptions from dandischema.metadata import aggregate_assets_summary, validate from django.conf import settings +from django.contrib.auth.models import User from django.db import transaction from django.utils import timezone from dandiapi.api.models import Asset, Version +from dandiapi.api.services.asset import change_asset from dandiapi.api.services.metadata.exceptions import ( AssetHasBeenPublishedError, VersionHasBeenPublishedError, @@ -180,3 +185,30 @@ def _get_version_validation_result( version_qs.update(status=Version.Status.VALIDATING) status, errors = _get_version_validation_result(current_version) version_qs.update(status=status, validation_errors=errors, modified=timezone.now()) + + +def re_extract_asset_metadata(asset: Asset): + readable_asset = RemoteReadableAsset( + asset.s3_url, size=asset.size, mtime=asset.modified, name=Path(asset.path).name + ) + + if not (asset.path.endswith('.nwb') or asset.path.endswith('.NWB')): + logger.info('Asset %s: Not an NWB file, skipping...', asset.path) + return + + new_metadata = nwb2asset(readable_asset).json_dict() + + # Use dandiset owner, default to some admin user + draft_version = asset.versions.get(version='draft') + user = draft_version.dandiset.owners.first() + if user is None: + user = User.objects.filter(is_superuser=True, is_staff=True).first() + + change_asset( + user=user, + asset=asset, + version=draft_version, + new_asset_blob=asset.blob, + new_zarr_archive=asset.zarr, + new_metadata=new_metadata, + ) diff --git a/dandiapi/api/tasks/__init__.py b/dandiapi/api/tasks/__init__.py index 2b4496c74..b22161461 100644 --- a/dandiapi/api/tasks/__init__.py +++ b/dandiapi/api/tasks/__init__.py @@ -5,6 +5,7 @@ from celery import shared_task from celery.exceptions import SoftTimeLimitExceeded from celery.utils.log import get_task_logger +from django.conf import settings from django.contrib.auth.models import User from dandiapi.api.doi import delete_doi @@ -22,6 +23,7 @@ if TYPE_CHECKING: from uuid import UUID + logger = get_task_logger(__name__) @@ -104,3 +106,22 @@ def unembargo_dandiset_task(dandiset_id: int, user_id: int): except Exception: send_dandiset_unembargo_failed_message(ds) raise + + +@shared_task +def dispatch_assets_to_re_extract(): + assets_to_migrate = Asset.objects.prefetch_related('versions').filter( + metadata__schemaVersion__lt=settings.DANDI_SCHEMA_VERSION, + versions__version='draft', + ) + + for asset in assets_to_migrate: + re_extract_asset_metadata_task.delay(asset_id=asset.id) + + +@shared_task +def re_extract_asset_metadata_task(asset_id: int): + from dandiapi.api.services.metadata import re_extract_asset_metadata + + asset = Asset.objects.get(id=asset_id) + re_extract_asset_metadata(asset=asset) diff --git a/setup.py b/setup.py index e7db9cc0a..7c1023a96 100644 --- a/setup.py +++ b/setup.py @@ -40,6 +40,7 @@ include_package_data=True, install_requires=[ 'celery', + 'dandi', # Pin dandischema to exact version to make explicit which schema version is being used 'dandischema==0.10.2', # schema version 0.6.8 'django~=4.1.0', @@ -59,6 +60,7 @@ 'djangorestframework-yaml', 'drf-extensions', 'drf-yasg', + 'fsspec[http]', 'jsonschema', 'boto3[s3]', 'more_itertools',