diff --git a/dandiapi/api/services/embargo/__init__.py b/dandiapi/api/services/embargo/__init__.py index b4c2c1aab..77c31d048 100644 --- a/dandiapi/api/services/embargo/__init__.py +++ b/dandiapi/api/services/embargo/__init__.py @@ -7,6 +7,7 @@ from botocore.config import Config from django.conf import settings from django.db import transaction +from more_itertools import chunked from dandiapi.api.mail import send_dandiset_unembargoed_message from dandiapi.api.models import AssetBlob, Dandiset, Version @@ -28,6 +29,7 @@ logger = logging.getLogger(__name__) +ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE = 5000 def _delete_asset_blob_tags(client: S3Client, blob: str): @@ -39,25 +41,25 @@ def _delete_asset_blob_tags(client: S3Client, blob: str): # NOTE: In testing this took ~2 minutes for 100,000 files def _remove_dandiset_asset_blob_embargo_tags(dandiset: Dandiset): - embargoed_asset_blobs = list( - AssetBlob.objects.filter(embargoed=True, assets__versions__dandiset=dandiset).values_list( - 'blob', flat=True - ) + client = get_boto_client(config=Config(max_pool_connections=100)) + embargoed_asset_blobs = ( + AssetBlob.objects.filter(embargoed=True, assets__versions__dandiset=dandiset) + .values_list('blob', flat=True) + .iterator(chunk_size=ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE) ) - client = get_boto_client(config=Config(max_pool_connections=100)) - with ThreadPoolExecutor(max_workers=100) as e: - futures = [ - e.submit(_delete_asset_blob_tags, client=client, blob=blob) - for blob in embargoed_asset_blobs - ] - - # Check if any failed and raise exception if so - failed = [ - blob for i, blob in enumerate(embargoed_asset_blobs) if futures[i].exception() is not None - ] - if failed: - raise AssetTagRemovalError('Some blobs failed to remove tags', blobs=failed) + # Chunk the blobs so we're never storing a list of all embargoed blobs + chunks = chunked(embargoed_asset_blobs, ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE) + for chunk in chunks: + with ThreadPoolExecutor(max_workers=100) as e: + futures = [ + e.submit(_delete_asset_blob_tags, client=client, blob=blob) for blob in chunk + ] + + # Check if any failed and raise exception if so + failed = [blob for i, blob in enumerate(chunk) if futures[i].exception() is not None] + if failed: + raise AssetTagRemovalError('Some blobs failed to remove tags', blobs=failed) @transaction.atomic() diff --git a/dandiapi/api/tests/test_unembargo.py b/dandiapi/api/tests/test_unembargo.py index 9740ecebf..0d6eff3e8 100644 --- a/dandiapi/api/tests/test_unembargo.py +++ b/dandiapi/api/tests/test_unembargo.py @@ -9,10 +9,14 @@ from dandiapi.api.models.dandiset import Dandiset from dandiapi.api.services.embargo import ( AssetBlobEmbargoedError, + _remove_dandiset_asset_blob_embargo_tags, remove_asset_blob_embargoed_tag, unembargo_dandiset, ) -from dandiapi.api.services.embargo.exceptions import DandisetActiveUploadsError +from dandiapi.api.services.embargo.exceptions import ( + AssetTagRemovalError, + DandisetActiveUploadsError, +) from dandiapi.api.services.exceptions import DandiError from dandiapi.api.tasks import unembargo_dandiset_task @@ -114,6 +118,54 @@ def test_unembargo_dandiset_uploads_exist(draft_version_factory, upload_factory) unembargo_dandiset(ds) +@pytest.mark.django_db() +def test_remove_dandiset_asset_blob_embargo_tags_chunks( + draft_version_factory, + asset_factory, + embargoed_asset_blob_factory, + mocker, +): + delete_asset_blob_tags_mock = mocker.patch( + 'dandiapi.api.services.embargo._delete_asset_blob_tags' + ) + chunk_size = mocker.patch('dandiapi.api.services.embargo.ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE', 2) + + draft_version: Version = draft_version_factory( + dandiset__embargo_status=Dandiset.EmbargoStatus.UNEMBARGOING + ) + ds: Dandiset = draft_version.dandiset + for _ in range(chunk_size + 1): + asset = asset_factory(blob=embargoed_asset_blob_factory()) + draft_version.assets.add(asset) + + _remove_dandiset_asset_blob_embargo_tags(dandiset=ds) + + # Assert that _delete_asset_blob_tags was called chunk_size +1 times, to ensure that it works + # correctly across chunks + assert len(delete_asset_blob_tags_mock.mock_calls) == chunk_size + 1 + + +@pytest.mark.django_db() +def test_delete_asset_blob_tags_fails( + draft_version_factory, + asset_factory, + embargoed_asset_blob_factory, + mocker, +): + mocker.patch('dandiapi.api.services.embargo._delete_asset_blob_tags', side_effect=ValueError) + draft_version: Version = draft_version_factory( + dandiset__embargo_status=Dandiset.EmbargoStatus.UNEMBARGOING + ) + ds: Dandiset = draft_version.dandiset + asset = asset_factory(blob=embargoed_asset_blob_factory()) + draft_version.assets.add(asset) + + # Check that if an exception within `_delete_asset_blob_tags` is raised, it's propagated upwards + # as an AssetTagRemovalError + with pytest.raises(AssetTagRemovalError): + _remove_dandiset_asset_blob_embargo_tags(dandiset=ds) + + @pytest.mark.django_db() def test_unembargo_dandiset( draft_version_factory,