Skip to content

Commit

Permalink
Make duplicate s3 log file handling more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
danlamanna committed May 25, 2023
1 parent 2a1d364 commit 35a5d9b
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Generated by Django 4.1.4 on 2023-05-25 18:42

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
('analytics', '0001_initial'),
]

operations = [
migrations.AlterUniqueTogether(
name='processeds3log',
unique_together=set(),
),
migrations.AddConstraint(
model_name='processeds3log',
constraint=models.UniqueConstraint(
fields=('name', 'embargoed'), name='analytics_processeds3log_unique_name_embargoed'
),
),
]
7 changes: 6 additions & 1 deletion dandiapi/analytics/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

class ProcessedS3Log(models.Model):
class Meta:
unique_together = ['name', 'embargoed']
constraints = [
models.UniqueConstraint(
fields=['name', 'embargoed'],
name='%(app_label)s_%(class)s_unique_name_embargoed',
)
]

name = models.CharField(
max_length=36,
Expand Down
18 changes: 15 additions & 3 deletions dandiapi/analytics/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
from typing import Generator

from celery.app import shared_task
from celery.utils.log import get_task_logger
from django.conf import settings
from django.db import transaction
from django.db.models.aggregates import Max
from django.db.models.expressions import F
from django.db.utils import IntegrityError
from s3logparse import s3logparse

from dandiapi.analytics.models import ProcessedS3Log
from dandiapi.api.models.asset import AssetBlob, EmbargoedAssetBlob
from dandiapi.api.storage import get_boto_client, get_embargo_storage, get_storage

logger = get_task_logger(__name__)

# should be one of the DANDI_DANDISETS_*_LOG_BUCKET_NAME settings
LogBucket = str

Expand Down Expand Up @@ -75,9 +79,17 @@ def process_s3_log_file_task(bucket: LogBucket, s3_log_key: str) -> None:
download_counts.update({log_entry.s3_key: 1})

with transaction.atomic():
log = ProcessedS3Log(name=Path(s3_log_key).name, embargoed=embargoed)
log.full_clean()
log.save()
try:
log = ProcessedS3Log(name=Path(s3_log_key).name, embargoed=embargoed)
# unique constraints should fail silently since it indicates a duplicate log file.
# the other validations from full_clean should still be raised, so disable unique
# checking.
log.full_clean(validate_unique=False)
log.save()
except IntegrityError as e:
if 'unique_name_embargoed' in str(e):
logger.info(f'Already processed log file {s3_log_key}, embargo: {embargoed}')
return

# since this is updating a potentially large number of blobs and running in parallel,
# it's very susceptible to deadlocking. lock the asset blobs for the duration of this
Expand Down
19 changes: 17 additions & 2 deletions dandiapi/analytics/tests/test_download_counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pytest

from dandiapi.analytics.models import ProcessedS3Log
from dandiapi.analytics.tasks import collect_s3_log_records_task
from dandiapi.analytics.tasks import collect_s3_log_records_task, process_s3_log_file_task
from dandiapi.api.storage import (
create_s3_storage,
get_boto_client,
Expand Down Expand Up @@ -43,7 +43,7 @@ def s3_log_file(s3_log_bucket, asset_blob):
),
)

yield
yield log_file_name

s3.delete_object(Bucket=s3_log_bucket, Key=log_file_name)

Expand All @@ -59,10 +59,25 @@ def test_processing_s3_log_files(s3_log_bucket, s3_log_file, asset_blob):

@pytest.mark.django_db
def test_processing_s3_log_files_idempotent(s3_log_bucket, s3_log_file, asset_blob):
# this tests that the outer task which collects the log files to process is
# idempotent, in other words, it uses StartAfter correctly.
collect_s3_log_records_task(s3_log_bucket)
# run the task again, it should skip the existing log record
collect_s3_log_records_task(s3_log_bucket)
asset_blob.refresh_from_db()

assert ProcessedS3Log.objects.count() == 1
assert asset_blob.download_count == 1


@pytest.mark.django_db
def test_processing_s3_log_file_task_idempotent(s3_log_bucket, s3_log_file, asset_blob):
# this tests that the inner task which processes a single log file is
# idempotent, utilizing the unique constraint on ProcessedS3Log correctly.
process_s3_log_file_task(s3_log_bucket, s3_log_file)
# run the task again, it should skip the existing log record
process_s3_log_file_task(s3_log_bucket, s3_log_file)
asset_blob.refresh_from_db()

assert ProcessedS3Log.objects.count() == 1
assert asset_blob.download_count == 1

0 comments on commit 35a5d9b

Please sign in to comment.