Skip to content

Commit

Permalink
Merge pull request #2025 from dandi/use-bulk-update-for-log-processing
Browse files Browse the repository at this point in the history
  • Loading branch information
danlamanna committed Sep 9, 2024
2 parents 1ce8a91 + e52c816 commit 16f5a6e
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions dandiapi/analytics/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from django.db.models.aggregates import Max
from django.db.models.expressions import F
from django.db.utils import IntegrityError
from more_itertools import batched
from s3logparse import s3logparse

from dandiapi.analytics.models import ProcessedS3Log
Expand Down Expand Up @@ -80,10 +81,19 @@ def process_s3_log_file_task(s3_log_key: str) -> None:
logger.info('Already processed log file %s', s3_log_key)
return

# we need to store all of the fully hydrated blob objects in memory in order to use
# bulk_update, but this turns out to not be very costly. 1,000 blobs use about ~8kb
# of memory.
asset_blobs = []

# batch the blob queries to avoid a large WHERE IN clause
for batch in batched(download_counts, 1_000):
asset_blobs += AssetBlob.objects.filter(blob__in=batch)

for asset_blob in asset_blobs:
asset_blob.download_count = F('download_count') + download_counts[asset_blob.blob]

# note this task is run serially per log file. this is to avoid the contention between
# multiple log files trying to update the same blobs. this serialization is enforced through
# the task queue configuration.
for blob, download_count in download_counts.items():
AssetBlob.objects.filter(blob=blob).update(
download_count=F('download_count') + download_count
)
AssetBlob.objects.bulk_update(asset_blobs, ['download_count'], batch_size=1_000)

0 comments on commit 16f5a6e

Please sign in to comment.