Skip to content

Commit

Permalink
Chunk asset blob list in _remove_dandiset_asset_blob_embargo_tags
Browse files Browse the repository at this point in the history
  • Loading branch information
jjnesbitt committed Jul 19, 2024
1 parent 720018b commit 566295a
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 18 deletions.
36 changes: 19 additions & 17 deletions dandiapi/api/services/embargo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from botocore.config import Config
from django.conf import settings
from django.db import transaction
from more_itertools import chunked

from dandiapi.api.mail import send_dandiset_unembargoed_message
from dandiapi.api.models import AssetBlob, Dandiset, Version
Expand All @@ -28,6 +29,7 @@


logger = logging.getLogger(__name__)
ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE = 5000


def _delete_asset_blob_tags(client: S3Client, blob: str):
Expand All @@ -39,25 +41,25 @@ def _delete_asset_blob_tags(client: S3Client, blob: str):

# NOTE: In testing this took ~2 minutes for 100,000 files
def _remove_dandiset_asset_blob_embargo_tags(dandiset: Dandiset):
embargoed_asset_blobs = list(
AssetBlob.objects.filter(embargoed=True, assets__versions__dandiset=dandiset).values_list(
'blob', flat=True
)
client = get_boto_client(config=Config(max_pool_connections=100))
embargoed_asset_blobs = (
AssetBlob.objects.filter(embargoed=True, assets__versions__dandiset=dandiset)
.values_list('blob', flat=True)
.iterator(chunk_size=ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE)
)

client = get_boto_client(config=Config(max_pool_connections=100))
with ThreadPoolExecutor(max_workers=100) as e:
futures = [
e.submit(_delete_asset_blob_tags, client=client, blob=blob)
for blob in embargoed_asset_blobs
]

# Check if any failed and raise exception if so
failed = [
blob for i, blob in enumerate(embargoed_asset_blobs) if futures[i].exception() is not None
]
if failed:
raise AssetTagRemovalError('Some blobs failed to remove tags', blobs=failed)
# Chunk the blobs so we're never storing a list of all embargoed blobs
chunks = chunked(embargoed_asset_blobs, ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE)
for chunk in chunks:
with ThreadPoolExecutor(max_workers=100) as e:
futures = [
e.submit(_delete_asset_blob_tags, client=client, blob=blob) for blob in chunk
]

# Check if any failed and raise exception if so
failed = [blob for i, blob in enumerate(chunk) if futures[i].exception() is not None]
if failed:
raise AssetTagRemovalError('Some blobs failed to remove tags', blobs=failed)


@transaction.atomic()
Expand Down
54 changes: 53 additions & 1 deletion dandiapi/api/tests/test_unembargo.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
from dandiapi.api.models.dandiset import Dandiset
from dandiapi.api.services.embargo import (
AssetBlobEmbargoedError,
_remove_dandiset_asset_blob_embargo_tags,
remove_asset_blob_embargoed_tag,
unembargo_dandiset,
)
from dandiapi.api.services.embargo.exceptions import DandisetActiveUploadsError
from dandiapi.api.services.embargo.exceptions import (
AssetTagRemovalError,
DandisetActiveUploadsError,
)
from dandiapi.api.services.exceptions import DandiError
from dandiapi.api.tasks import unembargo_dandiset_task

Expand Down Expand Up @@ -114,6 +118,54 @@ def test_unembargo_dandiset_uploads_exist(draft_version_factory, upload_factory)
unembargo_dandiset(ds)


@pytest.mark.django_db()
def test_remove_dandiset_asset_blob_embargo_tags_chunks(
draft_version_factory,
asset_factory,
embargoed_asset_blob_factory,
mocker,
):
delete_asset_blob_tags_mock = mocker.patch(
'dandiapi.api.services.embargo._delete_asset_blob_tags'
)
chunk_size = mocker.patch('dandiapi.api.services.embargo.ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE', 2)

draft_version: Version = draft_version_factory(
dandiset__embargo_status=Dandiset.EmbargoStatus.UNEMBARGOING
)
ds: Dandiset = draft_version.dandiset
for _ in range(chunk_size + 1):
asset = asset_factory(blob=embargoed_asset_blob_factory())
draft_version.assets.add(asset)

_remove_dandiset_asset_blob_embargo_tags(dandiset=ds)

# Assert that _delete_asset_blob_tags was called chunk_size +1 times, to ensure that it works
# correctly across chunks
assert len(delete_asset_blob_tags_mock.mock_calls) == chunk_size + 1


@pytest.mark.django_db()
def test_delete_asset_blob_tags_fails(
draft_version_factory,
asset_factory,
embargoed_asset_blob_factory,
mocker,
):
mocker.patch('dandiapi.api.services.embargo._delete_asset_blob_tags', side_effect=ValueError)
draft_version: Version = draft_version_factory(
dandiset__embargo_status=Dandiset.EmbargoStatus.UNEMBARGOING
)
ds: Dandiset = draft_version.dandiset
asset = asset_factory(blob=embargoed_asset_blob_factory())
draft_version.assets.add(asset)

# Check that if an exception within `_delete_asset_blob_tags` is raised, it's propagated upwards
# as an AssetTagRemovalError
with pytest.raises(AssetTagRemovalError):
_remove_dandiset_asset_blob_embargo_tags(dandiset=ds)


@pytest.mark.django_db()
def test_unembargo_dandiset(
draft_version_factory,
Expand Down

0 comments on commit 566295a

Please sign in to comment.