Skip to content

Commit

Permalink
Add schedule for processing new S3 log files
Browse files Browse the repository at this point in the history
  • Loading branch information
danlamanna committed Jun 14, 2023
1 parent 943791f commit 0c5c659
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 0 deletions.
6 changes: 6 additions & 0 deletions dandiapi/analytics/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ def process_s3_log_file_task(bucket: LogBucket, s3_log_key: str) -> None:
settings.DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME,
]
embargoed = bucket == settings.DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME

# short circuit if the log file has already been processed. note that this doesn't guarantee
# exactly once processing, that's what the unique constraint on ProcessedS3Log is for.
if ProcessedS3Log.objects.filter(name=Path(s3_log_key).name, embargoed=embargoed).exists():
return

s3 = get_boto_client(get_storage() if not embargoed else get_embargo_storage())
BlobModel = AssetBlob if not embargoed else EmbargoedAssetBlob
data = s3.get_object(Bucket=bucket, Key=s3_log_key)
Expand Down
11 changes: 11 additions & 0 deletions dandiapi/api/tasks/scheduled.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from django.contrib.auth.models import User
from django.db import connection

from dandiapi.analytics.tasks import collect_s3_log_records_task
from dandiapi.api.mail import send_pending_users_message
from dandiapi.api.models import UserMetadata, Version
from dandiapi.api.services.metadata import version_aggregate_assets_summary
Expand Down Expand Up @@ -80,3 +81,13 @@ def register_scheduled_tasks(sender: Celery, **kwargs):

# Refresh the materialized view used by asset search every 10 mins.
sender.add_periodic_task(timedelta(minutes=10), refresh_materialized_view_search.s())

# Process new S3 logs every hour
for log_bucket in [
settings.DANDI_DANDISETS_LOG_BUCKET_NAME,
settings.DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME,
]:
sender.add_periodic_task(
timedelta(hours=1),
collect_s3_log_records_task.s(log_bucket),
)

0 comments on commit 0c5c659

Please sign in to comment.