-
Notifications
You must be signed in to change notification settings - Fork 156
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add blob.open() for file-like I/O
- Loading branch information
Showing
5 changed files
with
1,008 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,327 @@ | ||
# Copyright 2021 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import io | ||
|
||
# Resumable uploads require a chunk size of precisely a multiple of 256KiB. | ||
DEFAULT_CHUNK_SIZE = 10 * 1024 * 1024 # 10 MiB (40 times minimum chunk size). | ||
|
||
# Valid keyword arguments for download methods, and blob.reload() if needed. | ||
# Note: Changes here need to be reflected in the blob.open() docstring. | ||
VALID_DOWNLOAD_KWARGS = { | ||
"if_generation_match", | ||
"if_generation_not_match", | ||
"if_metageneration_match", | ||
"if_metageneration_not_match", | ||
"timeout", | ||
} | ||
|
||
# Valid keyword arguments for upload methods. | ||
# Note: Changes here need to be reflected in the blob.open() docstring. | ||
VALID_UPLOAD_KWARGS = { | ||
"content_type", | ||
"num_retries", | ||
"predefined_acl", | ||
"if_generation_match", | ||
"if_generation_not_match", | ||
"if_metageneration_match", | ||
"if_metageneration_not_match", | ||
"timeout", | ||
"checksum", | ||
} | ||
|
||
|
||
class BlobReader(io.BufferedIOBase): | ||
def __init__(self, blob, chunk_size=None, **download_kwargs): | ||
"""docstring note that download_kwargs also used for reload()""" | ||
for kwarg in download_kwargs: | ||
if kwarg not in VALID_DOWNLOAD_KWARGS: | ||
raise ValueError( | ||
"BlobReader does not support keyword argument {}.".format(kwarg) | ||
) | ||
|
||
self._blob = blob | ||
self._pos = 0 | ||
self._buffer = io.BytesIO() | ||
self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE | ||
self._download_kwargs = download_kwargs | ||
|
||
def read(self, size=-1): | ||
self._checkClosed() # Raises ValueError if closed. | ||
|
||
result = self._buffer.read(size) | ||
# If the read request demands more bytes than are buffered, fetch more. | ||
remaining_size = size - len(result) | ||
if remaining_size > 0 or size < 0: | ||
self._buffer.seek(0) | ||
self._buffer.truncate(0) # Clear the buffer to make way for new data. | ||
fetch_start = self._pos + len(result) | ||
if size > 0: | ||
# Fetch the larger of self._chunk_size or the remaining_size. | ||
fetch_end = fetch_start + max(remaining_size, self._chunk_size) | ||
else: | ||
fetch_end = None | ||
|
||
# Download the blob. | ||
result += self._blob.download_as_bytes( | ||
start=fetch_start, end=fetch_end, **self._download_kwargs | ||
) | ||
|
||
# If more bytes were read than is immediately needed, buffer the | ||
# remainder and then trim the result. | ||
if size > 0 and len(result) > size: | ||
self._buffer.write(result[size:]) | ||
self._buffer.seek(0) | ||
result = result[:size] | ||
|
||
self._pos += len(result) | ||
|
||
return result | ||
|
||
def read1(self, size=-1): | ||
return self.read(size) | ||
|
||
def seek(self, pos, whence=0): | ||
"""Seek within the blob. | ||
This implementation of seek() uses knowledge of the blob size to | ||
validate that the reported position does not exceed the blob last byte. | ||
If the blob size is not already known it will call blob.reload(). | ||
""" | ||
self._checkClosed() # Raises ValueError if closed. | ||
|
||
if self._blob.size is None: | ||
self._blob.reload(**self._download_kwargs) | ||
|
||
initial_pos = self._pos | ||
|
||
if whence == 0: | ||
self._pos = pos | ||
elif whence == 1: | ||
self._pos += pos | ||
elif whence == 2: | ||
self._pos = self._blob.size + pos | ||
if whence not in {0, 1, 2}: | ||
raise ValueError("invalid whence value") | ||
|
||
if self._pos > self._blob.size: | ||
self._pos = self._blob.size | ||
|
||
# Seek or invalidate buffer as needed. | ||
difference = self._pos - initial_pos | ||
new_buffer_pos = self._buffer.seek(difference, 1) | ||
if new_buffer_pos != difference: # Buffer does not contain new pos. | ||
# Invalidate buffer. | ||
self._buffer.seek(0) | ||
self._buffer.truncate(0) | ||
|
||
return self._pos | ||
|
||
def close(self): | ||
self._buffer.close() | ||
|
||
def _checkClosed(self): | ||
if self._buffer.closed: | ||
raise ValueError("I/O operation on closed file.") | ||
|
||
def readable(self): | ||
return True | ||
|
||
def writable(self): | ||
return False | ||
|
||
def seekable(self): | ||
return True | ||
|
||
|
||
class BlobWriter(io.BufferedIOBase): | ||
def __init__(self, blob, chunk_size=None, text_mode=False, **upload_kwargs): | ||
for kwarg in upload_kwargs: | ||
if kwarg not in VALID_UPLOAD_KWARGS: | ||
raise ValueError( | ||
"BlobWriter does not support keyword argument {}.".format(kwarg) | ||
) | ||
self._blob = blob | ||
self._buffer = SlidingBuffer() | ||
self._upload_and_transport = None | ||
# Resumable uploads require a chunk size of a multiple of 256KiB. | ||
# self._chunk_size must not be changed after the upload is initiated. | ||
self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE | ||
# In text mode this class will be wrapped and TextIOWrapper requires a | ||
# different behavior of flush(). | ||
self._text_mode = text_mode | ||
self._upload_kwargs = upload_kwargs | ||
|
||
def write(self, b): | ||
self._checkClosed() # Raises ValueError if closed. | ||
|
||
pos = self._buffer.write(b) | ||
|
||
# If there is enough content, upload chunks. | ||
num_chunks = len(self._buffer) // self._chunk_size | ||
if num_chunks: | ||
self._upload_chunks_from_buffer(num_chunks) | ||
|
||
return pos | ||
|
||
def _initiate_upload(self): | ||
num_retries = self._upload_kwargs.pop("num_retries", None) | ||
content_type = self._upload_kwargs.pop("content_type", None) | ||
|
||
if ( | ||
self._upload_kwargs.get("if_metageneration_match") is None | ||
and num_retries is None | ||
): | ||
# Uploads are only idempotent (safe to retry) if | ||
# if_metageneration_match is set. If it is not set, the default | ||
# num_retries should be 0. Note: Because retry logic for uploads is | ||
# provided by the google-resumable-media-python package, it doesn't | ||
# use the ConditionalRetryStrategy class used in other API calls in | ||
# this library to solve this problem. | ||
num_retries = 0 | ||
|
||
self._upload_and_transport = self._blob._initiate_resumable_upload( | ||
self._blob.bucket.client, | ||
self._buffer, | ||
content_type, | ||
None, | ||
num_retries, | ||
chunk_size=self._chunk_size, | ||
**self._upload_kwargs | ||
) | ||
|
||
def _upload_chunks_from_buffer(self, num_chunks): | ||
"""Upload a specified number of chunks.""" | ||
|
||
# Initialize the upload if necessary. | ||
if not self._upload_and_transport: | ||
self._initiate_upload() | ||
|
||
upload, transport = self._upload_and_transport | ||
|
||
# Upload chunks. The SlidingBuffer class will manage seek position. | ||
for _ in range(num_chunks): | ||
upload.transmit_next_chunk(transport) | ||
|
||
# Wipe the buffer of chunks uploaded, preserving any remaining data. | ||
self._buffer.flush() | ||
|
||
def tell(self): | ||
return self._buffer.tell() + len(self._buffer) | ||
|
||
def flush(self): | ||
if self._text_mode: | ||
# TextIOWrapper expects this method to succeed before calling close(). | ||
return | ||
|
||
raise io.UnsupportedOperation( | ||
"Cannot flush without finalizing upload. Use close() instead." | ||
) | ||
|
||
def close(self): | ||
self._checkClosed() # Raises ValueError if closed. | ||
|
||
self._upload_chunks_from_buffer(1) | ||
self._buffer.close() | ||
|
||
def _checkClosed(self): | ||
if self._buffer.closed: | ||
raise ValueError("I/O operation on closed file.") | ||
|
||
def readable(self): | ||
return False | ||
|
||
def writable(self): | ||
return True | ||
|
||
def seekable(self): | ||
return False | ||
|
||
|
||
class SlidingBuffer(object): | ||
"""A non-rewindable buffer that frees memory of chunks already consumed. | ||
This class is necessary because `google-resumable-media-python` expects | ||
`tell()` to work relative to the start of the file, not relative to a place | ||
in an intermediate buffer. Using this class, we present an external | ||
interface with consistent seek and tell behavior without having to actually | ||
store bytes already sent. | ||
Behavior of this class differs from an ordinary BytesIO buffer. `write()` | ||
will always append to the end of the file only and not change the seek | ||
position otherwise. `flush()` will delete all data already read (data to the | ||
left of the seek position). `tell()` will report the seek position of the | ||
buffer including all deleted data. Additionally the class implements | ||
__len__() which will report the size of the actual underlying buffer. | ||
This class does not attempt to implement the entire Python I/O interface. | ||
""" | ||
|
||
def __init__(self): | ||
self._buffer = io.BytesIO() | ||
self._cursor = 0 | ||
|
||
def write(self, b): | ||
"""Append to the end of the buffer without changing the position.""" | ||
self._checkClosed() # Raises ValueError if closed. | ||
|
||
bookmark = self._buffer.tell() | ||
self._buffer.seek(0, io.SEEK_END) | ||
pos = self._buffer.write(b) | ||
self._buffer.seek(bookmark) | ||
return self._cursor + pos | ||
|
||
def read(self, size=-1): | ||
"""Read and move the cursor.""" | ||
self._checkClosed() # Raises ValueError if closed. | ||
|
||
data = self._buffer.read(size) | ||
self._cursor += len(data) | ||
return data | ||
|
||
def flush(self): | ||
"""Delete already-read data (all data to the left of the position).""" | ||
self._checkClosed() # Raises ValueError if closed. | ||
|
||
# BytesIO can't be deleted from the left, so save any leftover, unread | ||
# data and truncate at 0, then readd leftover data. | ||
leftover = self._buffer.read() | ||
self._buffer.seek(0) | ||
self._buffer.truncate(0) | ||
self._buffer.write(leftover) | ||
self._buffer.seek(0) | ||
|
||
def tell(self): | ||
"""Report how many bytes have been read from the buffer in total.""" | ||
return self._cursor | ||
|
||
def seek(self, pos, whence=None): | ||
raise io.UnsupportedOperation("seek() is not supported for this class.") | ||
|
||
def __len__(self): | ||
"""Determine the size of the buffer by seeking to the end.""" | ||
bookmark = self._buffer.tell() | ||
length = self._buffer.seek(0, io.SEEK_END) | ||
self._buffer.seek(bookmark) | ||
return length | ||
|
||
def close(self): | ||
return self._buffer.close() | ||
|
||
def _checkClosed(self): | ||
return self._buffer._checkClosed() | ||
|
||
@property | ||
def closed(self): | ||
return self._buffer.closed |
Oops, something went wrong.