Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: cancel upload when BlobWriter exits with exception #1243

Merged
merged 1 commit into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ setup.py file. Applications which do not import directly from
`google-resumable-media` can safely disregard this dependency. This backwards
compatibility feature will be removed in a future major version update.

Miscellaneous
~~~~~~~~~~~~~

- The BlobWriter class now attempts to terminate an ongoing resumable upload if
the writer exits with an exception.

Quick Start
-----------

Expand Down
2 changes: 1 addition & 1 deletion docs/storage/exceptions.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Exceptions
~~~~~~~~~
~~~~~~~~~~

.. automodule:: google.cloud.storage.exceptions
:members:
Expand Down
13 changes: 13 additions & 0 deletions google/cloud/storage/fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,19 @@ def close(self):
self._upload_chunks_from_buffer(1)
self._buffer.close()

def terminate(self):
"""Cancel the ResumableUpload."""
if self._upload_and_transport:
upload, transport = self._upload_and_transport
transport.delete(upload.upload_url)
self._buffer.close()

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
self.terminate()
else:
self.close()

@property
def closed(self):
return self._buffer.closed
Expand Down
40 changes: 40 additions & 0 deletions tests/system/test_fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
# limitations under the License.


import pytest

ddelange marked this conversation as resolved.
Show resolved Hide resolved
from google.cloud.storage.fileio import CHUNK_SIZE_MULTIPLE
from .test_blob import _check_blob_hash


Expand Down Expand Up @@ -76,3 +79,40 @@ def test_blobwriter_and_blobreader_text_mode(
assert text_data[:100] == reader.read(100)
assert 0 == reader.seek(0)
assert reader.read() == text_data


def test_blobwriter_exit(
shared_bucket,
blobs_to_delete,
service_account,
):
blob = shared_bucket.blob("NeverUploaded")

# no-op when nothing was uploaded yet
with pytest.raises(ValueError, match="SIGTERM received"):
with blob.open("wb") as writer:
writer.write(b"first chunk") # not yet uploaded
raise ValueError("SIGTERM received") # no upload to cancel in __exit__
# blob should not exist
assert not blob.exists()

# unhandled exceptions should cancel the upload
with pytest.raises(ValueError, match="SIGTERM received"):
with blob.open("wb", chunk_size=CHUNK_SIZE_MULTIPLE) as writer:
writer.write(b"first chunk") # not yet uploaded
writer.write(bytes(CHUNK_SIZE_MULTIPLE)) # uploaded
raise ValueError("SIGTERM received") # upload is cancelled in __exit__
# blob should not exist
assert not blob.exists()

# handled exceptions should not cancel the upload
with blob.open("wb", chunk_size=CHUNK_SIZE_MULTIPLE) as writer:
writer.write(b"first chunk") # not yet uploaded
writer.write(bytes(CHUNK_SIZE_MULTIPLE)) # uploaded
try:
raise ValueError("This is fine")
except ValueError:
pass # no exception context passed to __exit__
blobs_to_delete.append(blob)
# blob should have been uploaded
assert blob.exists()
61 changes: 54 additions & 7 deletions tests/unit/test_fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import mock

from google.api_core.exceptions import RequestRangeNotSatisfiable
from google.cloud.storage.fileio import CHUNK_SIZE_MULTIPLE
from google.cloud.storage.retry import DEFAULT_RETRY

TEST_TEXT_DATA = string.ascii_lowercase + "\n" + string.ascii_uppercase + "\n"
Expand Down Expand Up @@ -377,7 +378,7 @@ def test_write(self, mock_warn):
# Write under chunk_size. This should be buffered and the upload not
# initiated.
writer.write(TEST_BINARY_DATA[0:4])
blob.initiate_resumable_upload.assert_not_called()
blob._initiate_resumable_upload.assert_not_called()

# Write over chunk_size. This should result in upload initialization
# and multiple chunks uploaded.
Expand Down Expand Up @@ -426,6 +427,52 @@ def test_close_errors(self):
with self.assertRaises(ValueError):
writer.write(TEST_BINARY_DATA)

def test_terminate_after_initiate(self):
blob = mock.Mock()

upload = mock.Mock(upload_url="dummy")
transport = mock.Mock()

blob._initiate_resumable_upload.return_value = (upload, transport)

with self.assertRaises(RuntimeError):
with self._make_blob_writer(blob, chunk_size=CHUNK_SIZE_MULTIPLE) as writer:
writer.write(bytes(CHUNK_SIZE_MULTIPLE + 1)) # initiate upload
raise RuntimeError # should terminate the upload
blob._initiate_resumable_upload.assert_called_once() # upload initiated
self.assertTrue(writer.closed) # terminate called
transport.delete.assert_called_with("dummy") # resumable upload terminated

def test_terminate_before_initiate(self):
blob = mock.Mock()

upload = mock.Mock()
transport = mock.Mock()

blob._initiate_resumable_upload.return_value = (upload, transport)

with self.assertRaises(RuntimeError):
with self._make_blob_writer(blob, chunk_size=CHUNK_SIZE_MULTIPLE) as writer:
writer.write(bytes(CHUNK_SIZE_MULTIPLE - 1)) # upload not yet initiated
raise RuntimeError # there is no resumable upload to terminate
blob._initiate_resumable_upload.assert_not_called() # upload not yet initiated
self.assertTrue(writer.closed) # terminate called
transport.delete.assert_not_called() # there's no resumable upload to terminate

def test_terminate_skipped(self):
blob = mock.Mock()

upload = mock.Mock()
transport = mock.Mock()

blob._initiate_resumable_upload.return_value = (upload, transport)

with self._make_blob_writer(blob, chunk_size=CHUNK_SIZE_MULTIPLE) as writer:
writer.write(bytes(CHUNK_SIZE_MULTIPLE + 1)) # upload initiated
blob._initiate_resumable_upload.assert_called() # upload initiated
self.assertTrue(writer.closed) # close called
transport.delete.assert_not_called() # terminate not called

def test_flush_fails(self):
blob = mock.Mock(chunk_size=None)
writer = self._make_blob_writer(blob)
Expand Down Expand Up @@ -468,7 +515,7 @@ def test_conditional_retry_failure(self):
# Write under chunk_size. This should be buffered and the upload not
# initiated.
writer.write(TEST_BINARY_DATA[0:4])
blob.initiate_resumable_upload.assert_not_called()
blob._initiate_resumable_upload.assert_not_called()

# Write over chunk_size. This should result in upload initialization
# and multiple chunks uploaded.
Expand Down Expand Up @@ -520,7 +567,7 @@ def test_conditional_retry_pass(self):
# Write under chunk_size. This should be buffered and the upload not
# initiated.
writer.write(TEST_BINARY_DATA[0:4])
blob.initiate_resumable_upload.assert_not_called()
blob._initiate_resumable_upload.assert_not_called()

# Write over chunk_size. This should result in upload initialization
# and multiple chunks uploaded.
Expand Down Expand Up @@ -573,7 +620,7 @@ def test_forced_default_retry(self):
# Write under chunk_size. This should be buffered and the upload not
# initiated.
writer.write(TEST_BINARY_DATA[0:4])
blob.initiate_resumable_upload.assert_not_called()
blob._initiate_resumable_upload.assert_not_called()

# Write over chunk_size. This should result in upload initialization
# and multiple chunks uploaded.
Expand Down Expand Up @@ -619,7 +666,7 @@ def test_num_retries_and_retry_conflict(self, mock_warn):
# Write under chunk_size. This should be buffered and the upload not
# initiated.
writer.write(TEST_BINARY_DATA[0:4])
blob.initiate_resumable_upload.assert_not_called()
blob._initiate_resumable_upload.assert_not_called()

# Write over chunk_size. The mock will raise a ValueError, simulating
# actual behavior when num_retries and retry are both specified.
Expand Down Expand Up @@ -673,7 +720,7 @@ def test_num_retries_only(self, mock_warn):
# Write under chunk_size. This should be buffered and the upload not
# initiated.
writer.write(TEST_BINARY_DATA[0:4])
blob.initiate_resumable_upload.assert_not_called()
blob._initiate_resumable_upload.assert_not_called()

# Write over chunk_size. This should result in upload initialization
# and multiple chunks uploaded.
Expand Down Expand Up @@ -965,7 +1012,7 @@ def test_write(self, mock_warn):
# Write under chunk_size. This should be buffered and the upload not
# initiated.
writer.write(TEST_MULTIBYTE_TEXT_DATA[0:2])
blob.initiate_resumable_upload.assert_not_called()
blob._initiate_resumable_upload.assert_not_called()

# Write all data and close.
writer.write(TEST_MULTIBYTE_TEXT_DATA[2:])
Expand Down