Skip to content

Commit

Permalink
Rip out embargoed models
Browse files Browse the repository at this point in the history
  • Loading branch information
jjnesbitt committed Apr 1, 2024
1 parent 22d819d commit 9fc81a4
Show file tree
Hide file tree
Showing 32 changed files with 555 additions and 696 deletions.
12 changes: 5 additions & 7 deletions dandiapi/analytics/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
from s3logparse import s3logparse

from dandiapi.analytics.models import ProcessedS3Log
from dandiapi.api.models.asset import AssetBlob, EmbargoedAssetBlob
from dandiapi.api.storage import get_boto_client, get_embargo_storage, get_storage
from dandiapi.api.models.asset import AssetBlob
from dandiapi.api.storage import get_boto_client, get_storage

if TYPE_CHECKING:
from collections.abc import Generator
Expand All @@ -31,8 +31,7 @@ def _bucket_objects_after(bucket: str, after: str | None) -> Generator[dict, Non
settings.DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME,
}:
raise ValueError(f'Non-log bucket: {bucket}')
embargoed = bucket == settings.DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME
s3 = get_boto_client(get_storage() if not embargoed else get_embargo_storage())
s3 = get_boto_client(get_storage())
kwargs = {}
if after:
kwargs['StartAfter'] = after
Expand Down Expand Up @@ -80,8 +79,7 @@ def process_s3_log_file_task(bucket: LogBucket, s3_log_key: str) -> None:
if ProcessedS3Log.objects.filter(name=s3_log_key.split('/')[-1], embargoed=embargoed).exists():
return

s3 = get_boto_client(get_storage() if not embargoed else get_embargo_storage())
BlobModel = AssetBlob if not embargoed else EmbargoedAssetBlob # noqa: N806
s3 = get_boto_client(get_storage())
data = s3.get_object(Bucket=bucket, Key=s3_log_key)
download_counts = Counter()

Expand All @@ -107,6 +105,6 @@ def process_s3_log_file_task(bucket: LogBucket, s3_log_key: str) -> None:
# multiple log files trying to update the same blobs. this serialization is enforced through
# the task queue configuration.
for blob, download_count in download_counts.items():
BlobModel.objects.filter(blob=blob).update(
AssetBlob.objects.filter(blob=blob).update(
download_count=F('download_count') + download_count
)
19 changes: 1 addition & 18 deletions dandiapi/api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
Asset,
AssetBlob,
Dandiset,
EmbargoedAssetBlob,
Upload,
UserMetadata,
Version,
Expand Down Expand Up @@ -188,33 +187,17 @@ def get_queryset(self, request):
return super().get_queryset(request).prefetch_related('assets')


@admin.register(EmbargoedAssetBlob)
class EmbargoedAssetBlobAdmin(AssetBlobAdmin):
list_display = [
'id',
'blob_id',
'dandiset',
'blob',
'references',
'size',
'sha256',
'modified',
'created',
]


class AssetBlobInline(LimitedTabularInline):
model = AssetBlob


@admin.register(Asset)
class AssetAdmin(admin.ModelAdmin):
autocomplete_fields = ['blob', 'embargoed_blob', 'zarr', 'versions']
autocomplete_fields = ['blob', 'zarr', 'versions']
fields = [
'asset_id',
'path',
'blob',
'embargoed_blob',
'zarr',
'metadata',
'versions',
Expand Down
27 changes: 5 additions & 22 deletions dandiapi/api/asset_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,7 @@ def get_root_paths_many(versions: QuerySet[Version], *, join_assets=False) -> Qu
# Use prefetch_related here instead of select_related,
# as otherwise the resulting join is very large
if join_assets:
qs = qs.prefetch_related(
'asset',
'asset__blob',
'asset__embargoed_blob',
'asset__zarr',
)
qs = qs.prefetch_related('asset', 'asset__blob', 'asset__zarr')

return qs.filter(version__in=versions).exclude(path__contains='/').order_by('path')

Expand All @@ -51,12 +46,7 @@ def get_root_paths(version: Version) -> QuerySet[AssetPath]:
"""Return all root paths for a version."""
# Use prefetch_related here instead of select_related,
# as otherwise the resulting join is very large
qs = AssetPath.objects.prefetch_related(
'asset',
'asset__blob',
'asset__embargoed_blob',
'asset__zarr',
)
qs = AssetPath.objects.prefetch_related('asset', 'asset__blob', 'asset__zarr')
return qs.filter(version=version).exclude(path__contains='/').order_by('path')


Expand All @@ -73,12 +63,7 @@ def get_path_children(path: AssetPath, depth: int | None = 1) -> QuerySet[AssetP

path_ids = relation_qs.values_list('child', flat=True).distinct()
return (
AssetPath.objects.select_related(
'asset',
'asset__blob',
'asset__embargoed_blob',
'asset__zarr',
)
AssetPath.objects.select_related('asset', 'asset__blob', 'asset__zarr')
.filter(id__in=path_ids)
.order_by('path')
)
Expand Down Expand Up @@ -258,13 +243,11 @@ def add_version_asset_paths(version: Version):

# Get all aggregates
sizes = child_leaves.aggregate(
size=Coalesce(Sum('asset__blob__size'), 0),
esize=Coalesce(Sum('asset__embargoed_blob__size'), 0),
zsize=Coalesce(Sum('asset__zarr__size'), 0),
size=Coalesce(Sum('asset__blob__size'), 0), zsize=Coalesce(Sum('asset__zarr__size'), 0)
)

node.aggregate_files += child_leaves.count()
node.aggregate_size += sizes['size'] + sizes['esize'] + sizes['zsize']
node.aggregate_size += sizes['size'] + sizes['zsize']
node.save()


Expand Down
8 changes: 4 additions & 4 deletions dandiapi/api/migrations/0001_initial_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ class Migration(migrations.Migration):
'blob',
models.FileField(
blank=True,
storage=dandiapi.api.storage.get_embargo_storage,
upload_to=dandiapi.api.storage.get_embargo_storage_prefix,
storage=dandiapi.api.storage.get_storage,
upload_to=dandiapi.api.storage.get_storage_prefix,
),
),
(
Expand Down Expand Up @@ -442,8 +442,8 @@ class Migration(migrations.Migration):
'blob',
models.FileField(
blank=True,
storage=dandiapi.api.storage.get_embargo_storage,
upload_to=dandiapi.api.storage.get_embargo_storage_prefix,
storage=dandiapi.api.storage.get_storage,
upload_to=dandiapi.api.storage.get_storage_prefix,
),
),
(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Generated by Django 4.1.13 on 2024-01-16 18:31
from __future__ import annotations

from django.db import migrations, models


def migrate_embargoed_asset_blobs(apps, _):
Asset = apps.get_model('api.Asset')
AssetBlob = apps.get_model('api.AssetBlob')

# We must include a filter to only include assets which are part of a version, as otherwise
# we may include an embargoed asset which has been previously updated. Since updating an asset
# results in a new asset which points to the same blob as the original asset, this would copy
# the same blob twice, resulting in an integrity error (same blob_id).
embargoed_assets = Asset.objects.filter(
embargoed_blob__isnull=False, versions__isnull=False
).select_related('embargoed_blob')

# For each relevant asset, create a new asset blob with embargoed=True,
# and point the asset to that
for asset in embargoed_assets.iterator():
blob_id = str(asset.embargoed_blob.blob_id)
new_blob_location = f'blobs/{blob_id[0:3]}/{blob_id[3:6]}/{blob_id}'
new_asset_blob = AssetBlob.objects.create(
embargoed=True,
blob=new_blob_location,
blob_id=asset.embargoed_blob.blob_id,
created=asset.embargoed_blob.created,
modified=asset.embargoed_blob.modified,
sha256=asset.embargoed_blob.sha256,
etag=asset.embargoed_blob.etag,
size=asset.embargoed_blob.size,
download_count=asset.embargoed_blob.download_count,
)
asset.blob = new_asset_blob
asset.embargoed_blob = None
asset.save()


class Migration(migrations.Migration):
dependencies = [
('api', '0007_alter_asset_options_alter_version_options'),
]

operations = [
migrations.AddField(
model_name='assetblob',
name='embargoed',
field=models.BooleanField(default=False),
),
migrations.AddField(
model_name='upload',
name='embargoed',
field=models.BooleanField(default=False),
),
# Migrate all embargoedassetblobs and embargoeduploads to other models with embargoed=True
migrations.RunPython(migrate_embargoed_asset_blobs),
migrations.RemoveField(
model_name='embargoedassetblob',
name='dandiset',
),
migrations.RemoveField(
model_name='embargoedupload',
name='dandiset',
),
migrations.RemoveConstraint(
model_name='asset',
name='exactly-one-blob',
),
migrations.RemoveField(
model_name='asset',
name='embargoed_blob',
),
migrations.AddConstraint(
model_name='asset',
constraint=models.CheckConstraint(
check=models.Q(
models.Q(('blob__isnull', True), ('zarr__isnull', False)),
models.Q(('blob__isnull', False), ('zarr__isnull', True)),
_connector='OR',
),
name='blob-xor-zarr',
),
),
migrations.DeleteModel(
name='EmbargoedAssetBlob',
),
migrations.DeleteModel(
name='EmbargoedUpload',
),
]
6 changes: 2 additions & 4 deletions dandiapi/api/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import annotations

from .asset import Asset, AssetBlob, EmbargoedAssetBlob
from .asset import Asset, AssetBlob
from .asset_paths import AssetPath, AssetPathRelation
from .dandiset import Dandiset
from .oauth import StagingApplication
from .upload import EmbargoedUpload, Upload
from .upload import Upload
from .user import UserMetadata
from .version import Version

Expand All @@ -14,8 +14,6 @@
'AssetPath',
'AssetPathRelation',
'Dandiset',
'EmbargoedAssetBlob',
'EmbargoedUpload',
'StagingApplication',
'Upload',
'UserMetadata',
Expand Down
Loading

0 comments on commit 9fc81a4

Please sign in to comment.