Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Respond with 409 when creating duplicate asset blobs #2011

Merged
merged 1 commit into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 6 additions & 26 deletions dandiapi/api/tests/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,16 +524,10 @@ def test_upload_validate_wrong_etag(api_client, user, upload):
def test_upload_validate_existing_assetblob(api_client, user, upload, asset_blob_factory):
api_client.force_authenticate(user=user)

asset_blob = asset_blob_factory(etag=upload.etag, size=upload.size)
asset_blob_factory(etag=upload.etag, size=upload.size)

resp = api_client.post(f'/api/uploads/{upload.upload_id}/validate/')
assert resp.status_code == 200
assert resp.data == {
'blob_id': str(asset_blob.blob_id),
'etag': asset_blob.etag,
'sha256': asset_blob.sha256,
'size': asset_blob.size,
}
assert resp.status_code == 409

assert AssetBlob.objects.all().count() == 1
assert not Upload.objects.all().exists()
Expand All @@ -549,16 +543,10 @@ def test_upload_validate_embargo_existing_assetblob(
embargoed_upload = embargoed_upload_factory(dandiset=dandiset)

# The upload should recognize this preexisting AssetBlob and use it instead
asset_blob = asset_blob_factory(etag=embargoed_upload.etag, size=embargoed_upload.size)
asset_blob_factory(etag=embargoed_upload.etag, size=embargoed_upload.size)

resp = api_client.post(f'/api/uploads/{embargoed_upload.upload_id}/validate/')
assert resp.status_code == 200
assert resp.data == {
'blob_id': str(asset_blob.blob_id),
'etag': asset_blob.etag,
'sha256': asset_blob.sha256,
'size': asset_blob.size,
}
assert resp.status_code == 409

assert AssetBlob.objects.all().count() == 1

Expand All @@ -574,17 +562,9 @@ def test_upload_validate_embargo_existing_embargoed_assetblob(

# The upload should recognize this preexisting embargoed AssetBlob and use it instead
# This only works because the embargoed asset blob belongs to the same dandiset
embargoed_asset_blob = embargoed_asset_blob_factory(
etag=embargoed_upload.etag, size=embargoed_upload.size
)
embargoed_asset_blob_factory(etag=embargoed_upload.etag, size=embargoed_upload.size)

resp = api_client.post(f'/api/uploads/{embargoed_upload.upload_id}/validate/')
assert resp.status_code == 200
assert resp.data == {
'blob_id': str(embargoed_asset_blob.blob_id),
'etag': embargoed_asset_blob.etag,
'sha256': embargoed_asset_blob.sha256,
'size': embargoed_asset_blob.size,
}
assert resp.status_code == 409

assert AssetBlob.objects.all().count() == 1
27 changes: 15 additions & 12 deletions dandiapi/api/views/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,23 +240,26 @@ def upload_validate_view(request: Request, upload_id: str) -> HttpResponseBase:
f'ETag {upload.etag} does not match actual ETag {upload.actual_etag()}.'
)

# Use a transaction here so we can use select_for_update to lock the DB rows to avoid
# a race condition where two clients are uploading the same blob at the same time.
# This also ensures that the minting of the new AssetBlob and deletion of the Upload
# is an atomic operation.
with transaction.atomic():
try:
# Perhaps another upload completed before this one and has already created an AssetBlob.
asset_blob = AssetBlob.objects.select_for_update(no_key=True).get(
etag=upload.etag, size=upload.size
)
except AssetBlob.DoesNotExist:
asset_blob = upload.to_asset_blob()
asset_blob.save()
# Avoid a race condition where two clients are uploading the same blob at the same time.
asset_blob, created = AssetBlob.objects.get_or_create(
etag=upload.etag,
size=upload.size,
defaults={
'embargoed': upload.embargoed,
'blob_id': upload.upload_id,
'blob': upload.blob,
},
danlamanna marked this conversation as resolved.
Show resolved Hide resolved
)

# Clean up the upload
upload.delete()

if not created:
return Response(
'An identical blob has already been uploaded.', status=status.HTTP_409_CONFLICT
)

# Start calculating the sha256 in the background
calculate_sha256.delay(asset_blob.blob_id)

Expand Down