diff --git a/Procfile b/Procfile index b448cb0cc..8c35ab456 100644 --- a/Procfile +++ b/Procfile @@ -6,6 +6,8 @@ web: gunicorn --bind 0.0.0.0:$PORT dandiapi.wsgi --timeout 25 # This is OK for now because of how lightweight all high priority tasks currently are, # but we may need to switch back to a dedicated worker in the future. # The queue `celery` is the default queue. -worker: REMAP_SIGTERM=SIGQUIT celery --app dandiapi.celery worker --loglevel INFO -Q celery,s3-log-processing -B +worker: REMAP_SIGTERM=SIGQUIT celery --app dandiapi.celery worker --loglevel INFO -Q celery -B # The checksum-worker calculates blob checksums and updates zarr checksum files checksum-worker: REMAP_SIGTERM=SIGQUIT celery --app dandiapi.celery worker --loglevel INFO -Q calculate_sha256,ingest_zarr_archive +# The analytics-worker processes s3 log files serially +analytics-worker: REMAP_SIGTERM=SIGQUIT celery --app dandiapi.celery worker --loglevel INFO --concurrency 1 -Q s3-log-processing diff --git a/dandiapi/analytics/tasks/__init__.py b/dandiapi/analytics/tasks/__init__.py index 8e89eb981..562345317 100644 --- a/dandiapi/analytics/tasks/__init__.py +++ b/dandiapi/analytics/tasks/__init__.py @@ -90,11 +90,10 @@ def process_s3_log_file_task(bucket: LogBucket, s3_log_key: str) -> None: logger.info(f'Already processed log file {s3_log_key}, embargo: {embargoed}') return - # since this is updating a potentially large number of blobs and running in parallel, - # it's very susceptible to deadlocking. lock the asset blobs for the duration of this - # transaction. this is a quick-ish loop and updates to blobs otherwise are rare. - blobs = BlobModel.objects.select_for_update().filter(blob__in=download_counts.keys()) - for blob in blobs: - BlobModel.objects.filter(blob=blob.blob).update( - download_count=F('download_count') + download_counts[blob.blob] + # note this task is run serially per log file. this is to avoid the contention between + # multiple log files trying to update the same blobs. this serialization is enforced through + # the task queue configuration. + for blob, download_count in download_counts.items(): + BlobModel.objects.filter(blob=blob).update( + download_count=F('download_count') + download_count )