Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add concurrency=1 worker dyno for processing S3 logs #1614

Merged
merged 1 commit into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Procfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ web: gunicorn --bind 0.0.0.0:$PORT dandiapi.wsgi --timeout 25
# This is OK for now because of how lightweight all high priority tasks currently are,
# but we may need to switch back to a dedicated worker in the future.
# The queue `celery` is the default queue.
worker: REMAP_SIGTERM=SIGQUIT celery --app dandiapi.celery worker --loglevel INFO -Q celery,s3-log-processing -B
worker: REMAP_SIGTERM=SIGQUIT celery --app dandiapi.celery worker --loglevel INFO -Q celery -B
# The checksum-worker calculates blob checksums and updates zarr checksum files
checksum-worker: REMAP_SIGTERM=SIGQUIT celery --app dandiapi.celery worker --loglevel INFO -Q calculate_sha256,ingest_zarr_archive
# The analytics-worker processes s3 log files serially
analytics-worker: REMAP_SIGTERM=SIGQUIT celery --app dandiapi.celery worker --loglevel INFO --concurrency 1 -Q s3-log-processing
13 changes: 6 additions & 7 deletions dandiapi/analytics/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,10 @@ def process_s3_log_file_task(bucket: LogBucket, s3_log_key: str) -> None:
logger.info(f'Already processed log file {s3_log_key}, embargo: {embargoed}')
return

# since this is updating a potentially large number of blobs and running in parallel,
# it's very susceptible to deadlocking. lock the asset blobs for the duration of this
# transaction. this is a quick-ish loop and updates to blobs otherwise are rare.
blobs = BlobModel.objects.select_for_update().filter(blob__in=download_counts.keys())
for blob in blobs:
BlobModel.objects.filter(blob=blob.blob).update(
download_count=F('download_count') + download_counts[blob.blob]
# note this task is run serially per log file. this is to avoid the contention between
# multiple log files trying to update the same blobs. this serialization is enforced through
# the task queue configuration.
for blob, download_count in download_counts.items():
BlobModel.objects.filter(blob=blob).update(
download_count=F('download_count') + download_count
)