Skip to content

Commit

Permalink
Merge pull request #1605 from dandi/s3-log-processing-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
danlamanna authored May 26, 2023
2 parents 7631b99 + 6913d7e commit 325d284
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Procfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ web: gunicorn --bind 0.0.0.0:$PORT dandiapi.wsgi --timeout 25
# This is OK for now because of how lightweight all high priority tasks currently are,
# but we may need to switch back to a dedicated worker in the future.
# The queue `celery` is the default queue.
worker: REMAP_SIGTERM=SIGQUIT celery --app dandiapi.celery worker --loglevel INFO -Q celery -B
worker: REMAP_SIGTERM=SIGQUIT celery --app dandiapi.celery worker --loglevel INFO -Q celery,s3-log-processing -B
# The checksum-worker calculates blob checksums and updates zarr checksum files
checksum-worker: REMAP_SIGTERM=SIGQUIT celery --app dandiapi.celery worker --loglevel INFO -Q calculate_sha256,ingest_zarr_archive
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Generated by Django 4.1.4 on 2023-05-25 18:42

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
('analytics', '0001_initial'),
]

operations = [
migrations.AlterUniqueTogether(
name='processeds3log',
unique_together=set(),
),
migrations.AddConstraint(
model_name='processeds3log',
constraint=models.UniqueConstraint(
fields=('name', 'embargoed'), name='analytics_processeds3log_unique_name_embargoed'
),
),
]
7 changes: 6 additions & 1 deletion dandiapi/analytics/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

class ProcessedS3Log(models.Model):
class Meta:
unique_together = ['name', 'embargoed']
constraints = [
models.UniqueConstraint(
fields=['name', 'embargoed'],
name='%(app_label)s_%(class)s_unique_name_embargoed',
)
]

name = models.CharField(
max_length=36,
Expand Down
32 changes: 23 additions & 9 deletions dandiapi/analytics/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
from typing import Generator

from celery.app import shared_task
from celery.utils.log import get_task_logger
from django.conf import settings
from django.db import transaction
from django.db.models.aggregates import Max
from django.db.models.expressions import F
from django.db.utils import IntegrityError
from s3logparse import s3logparse

from dandiapi.analytics.models import ProcessedS3Log
from dandiapi.api.models.asset import AssetBlob, EmbargoedAssetBlob
from dandiapi.api.storage import get_boto_client, get_embargo_storage, get_storage

logger = get_task_logger(__name__)

# should be one of the DANDI_DANDISETS_*_LOG_BUCKET_NAME settings
LogBucket = str

Expand All @@ -33,7 +37,7 @@ def _bucket_objects_after(bucket: str, after: str | None) -> Generator[dict, Non
yield from page.get('Contents', [])


@shared_task(soft_time_limit=60, time_limit=80)
@shared_task(queue='s3-log-processing', soft_time_limit=60, time_limit=80)
def collect_s3_log_records_task(bucket: LogBucket) -> None:
"""Dispatch a task per S3 log file to process for download counts."""
assert bucket in [
Expand All @@ -49,7 +53,7 @@ def collect_s3_log_records_task(bucket: LogBucket) -> None:
process_s3_log_file_task.delay(bucket, s3_log_object['Key'])


@shared_task(soft_time_limit=120, time_limit=140)
@shared_task(queue='s3-log-processing', soft_time_limit=120, time_limit=140)
def process_s3_log_file_task(bucket: LogBucket, s3_log_key: str) -> None:
"""
Process a single S3 log file for download counts.
Expand All @@ -58,7 +62,6 @@ def process_s3_log_file_task(bucket: LogBucket, s3_log_key: str) -> None:
asset blobs. Prevents duplicate processing with a unique constraint on the ProcessedS3Log name
and embargoed fields.
"""
return
assert bucket in [
settings.DANDI_DANDISETS_LOG_BUCKET_NAME,
settings.DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME,
Expand All @@ -76,11 +79,22 @@ def process_s3_log_file_task(bucket: LogBucket, s3_log_key: str) -> None:
download_counts.update({log_entry.s3_key: 1})

with transaction.atomic():
log = ProcessedS3Log(name=Path(s3_log_key).name, embargoed=embargoed)
log.full_clean()
log.save()
try:
log = ProcessedS3Log(name=Path(s3_log_key).name, embargoed=embargoed)
# disable constraint validation checking so duplicate errors can be detected and
# ignored. the rest of the full_clean errors should still be raised.
log.full_clean(validate_constraints=False)
log.save()
except IntegrityError as e:
if 'unique_name_embargoed' in str(e):
logger.info(f'Already processed log file {s3_log_key}, embargo: {embargoed}')
return

for blob, download_count in download_counts.items():
BlobModel.objects.filter(blob=blob).update(
download_count=F('download_count') + download_count
# since this is updating a potentially large number of blobs and running in parallel,
# it's very susceptible to deadlocking. lock the asset blobs for the duration of this
# transaction. this is a quick-ish loop and updates to blobs otherwise are rare.
blobs = BlobModel.objects.select_for_update().filter(blob__in=download_counts.keys())
for blob in blobs:
BlobModel.objects.filter(blob=blob.blob).update(
download_count=F('download_count') + download_counts[blob.blob]
)
21 changes: 17 additions & 4 deletions dandiapi/analytics/tests/test_download_counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pytest

from dandiapi.analytics.models import ProcessedS3Log
from dandiapi.analytics.tasks import collect_s3_log_records_task
from dandiapi.analytics.tasks import collect_s3_log_records_task, process_s3_log_file_task
from dandiapi.api.storage import (
create_s3_storage,
get_boto_client,
Expand Down Expand Up @@ -43,13 +43,12 @@ def s3_log_file(s3_log_bucket, asset_blob):
),
)

yield
yield log_file_name

s3.delete_object(Bucket=s3_log_bucket, Key=log_file_name)


@pytest.mark.django_db
@pytest.mark.skip(reason='Temporarily disabled')
def test_processing_s3_log_files(s3_log_bucket, s3_log_file, asset_blob):
collect_s3_log_records_task(s3_log_bucket)
asset_blob.refresh_from_db()
Expand All @@ -59,12 +58,26 @@ def test_processing_s3_log_files(s3_log_bucket, s3_log_file, asset_blob):


@pytest.mark.django_db
@pytest.mark.skip(reason='Temporarily disabled')
def test_processing_s3_log_files_idempotent(s3_log_bucket, s3_log_file, asset_blob):
# this tests that the outer task which collects the log files to process is
# idempotent, in other words, it uses StartAfter correctly.
collect_s3_log_records_task(s3_log_bucket)
# run the task again, it should skip the existing log record
collect_s3_log_records_task(s3_log_bucket)
asset_blob.refresh_from_db()

assert ProcessedS3Log.objects.count() == 1
assert asset_blob.download_count == 1


@pytest.mark.django_db
def test_processing_s3_log_file_task_idempotent(s3_log_bucket, s3_log_file, asset_blob):
# this tests that the inner task which processes a single log file is
# idempotent, utilizing the unique constraint on ProcessedS3Log correctly.
process_s3_log_file_task(s3_log_bucket, s3_log_file)
# run the task again, it should ignore the new log
process_s3_log_file_task(s3_log_bucket, s3_log_file)
asset_blob.refresh_from_db()

assert ProcessedS3Log.objects.count() == 1
assert asset_blob.download_count == 1

0 comments on commit 325d284

Please sign in to comment.