diff --git a/dandiapi/analytics/tasks/__init__.py b/dandiapi/analytics/tasks/__init__.py index 562345317..9a8e1f8c7 100644 --- a/dandiapi/analytics/tasks/__init__.py +++ b/dandiapi/analytics/tasks/__init__.py @@ -67,6 +67,12 @@ def process_s3_log_file_task(bucket: LogBucket, s3_log_key: str) -> None: settings.DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME, ] embargoed = bucket == settings.DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME + + # short circuit if the log file has already been processed. note that this doesn't guarantee + # exactly once processing, that's what the unique constraint on ProcessedS3Log is for. + if ProcessedS3Log.objects.filter(name=Path(s3_log_key).name, embargoed=embargoed).exists(): + return + s3 = get_boto_client(get_storage() if not embargoed else get_embargo_storage()) BlobModel = AssetBlob if not embargoed else EmbargoedAssetBlob data = s3.get_object(Bucket=bucket, Key=s3_log_key) diff --git a/dandiapi/api/tasks/scheduled.py b/dandiapi/api/tasks/scheduled.py index 7b99b4588..5608b7d4e 100644 --- a/dandiapi/api/tasks/scheduled.py +++ b/dandiapi/api/tasks/scheduled.py @@ -13,6 +13,7 @@ from django.contrib.auth.models import User from django.db import connection +from dandiapi.analytics.tasks import collect_s3_log_records_task from dandiapi.api.mail import send_pending_users_message from dandiapi.api.models import UserMetadata, Version from dandiapi.api.services.metadata import version_aggregate_assets_summary @@ -80,3 +81,13 @@ def register_scheduled_tasks(sender: Celery, **kwargs): # Refresh the materialized view used by asset search every 10 mins. sender.add_periodic_task(timedelta(minutes=10), refresh_materialized_view_search.s()) + + # Process new S3 logs every hour + for log_bucket in [ + settings.DANDI_DANDISETS_LOG_BUCKET_NAME, + settings.DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME, + ]: + sender.add_periodic_task( + timedelta(hours=1), + collect_s3_log_records_task.s(log_bucket), + )