From 43b17fc54525dbd3d06ad30eef0ca61b48cca110 Mon Sep 17 00:00:00 2001 From: Mike VanDenburgh Date: Mon, 12 Dec 2022 10:47:24 -0500 Subject: [PATCH 1/2] Add `more_itertools` as a dependency --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index d46fd3aaa..04cdd6228 100644 --- a/setup.py +++ b/setup.py @@ -56,6 +56,7 @@ 'jsonschema', 'pydantic', 'boto3[s3]', + 'more_itertools', # Production-only 'django-composed-configuration[prod]>=0.22.0', # pin directly to a version since we're extending the private multipart interface From c6e1ecea366f0f69a261ce77830a1057bf77faf9 Mon Sep 17 00:00:00 2001 From: Mike VanDenburgh Date: Mon, 12 Dec 2022 11:00:41 -0500 Subject: [PATCH 2/2] Batch bulk creates in publish task --- dandiapi/api/services/publish/__init__.py | 30 +++++++++++++++++------ 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/dandiapi/api/services/publish/__init__.py b/dandiapi/api/services/publish/__init__.py index 048e0344f..b932479ed 100644 --- a/dandiapi/api/services/publish/__init__.py +++ b/dandiapi/api/services/publish/__init__.py @@ -1,6 +1,7 @@ from django.contrib.auth.models import User from django.db import transaction from django.db.models import QuerySet +from more_itertools import ichunked from dandiapi.api import doi from dandiapi.api.asset_paths import add_version_asset_paths @@ -83,17 +84,30 @@ def _publish_dandiset(dandiset_id: int) -> None: # Add a new many-to-many association directly to any already published assets already_published_assets: QuerySet[Asset] = old_version.assets.filter(published=True) - AssetVersions.objects.bulk_create( - AssetVersions(asset_id=asset_id, version_id=new_version.id) - for asset_id in already_published_assets.values_list('id', flat=True).iterator() - ) + + # Batch bulk creates to avoid blowing up memory when there are a lot of assets + for asset_ids_batch in ichunked( + already_published_assets.values_list('id', flat=True).iterator(), 5_000 + ): + AssetVersions.objects.bulk_create( + [ + AssetVersions(asset_id=asset_id, version_id=new_version.id) + for asset_id in asset_ids_batch + ] + ) draft_assets: QuerySet[Asset] = old_version.assets.filter(published=False) - AssetVersions.objects.bulk_create( - AssetVersions(asset_id=asset.id, version_id=new_version.id) - for asset in draft_assets.iterator() - ) + # Batch bulk creates to avoid blowing up memory when there are a lot of assets + for asset_ids_batch in ichunked( + draft_assets.values_list('id', flat=True).iterator(), 5_000 + ): + AssetVersions.objects.bulk_create( + [ + AssetVersions(asset_id=asset_id, version_id=new_version.id) + for asset_id in asset_ids_batch + ] + ) # Publish any draft assets for draft_asset in draft_assets.iterator():