Skip to content

Commit

Permalink
Merge pull request #263 from harshavardhana/minio
Browse files Browse the repository at this point in the history
Lets verify early on for UnexpectedShortReads
  • Loading branch information
Harshavardhana committed Aug 17, 2015
2 parents c96b2e9 + 67365c4 commit bb60999
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 12 deletions.
8 changes: 8 additions & 0 deletions minio/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ def __str__(self):
string_format = 'InvalidArgumentError: message: {0}'
return string_format.format(self.message)

class UnexpectedShortReadError(Exception):
def __init__(self, message, **kwargs):
self.message = message
super(UnexpectedShortReadError, self).__init__(**kwargs)

def __str__(self):
string_format = 'UnexpectedShortReadError: message: {0}'
return string_format.format(self.message)

class ResponseError(Exception):
def __init__(self, code, message, request_id, host_id, resource, xml=None,
Expand Down
2 changes: 1 addition & 1 deletion minio/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def calculate_part_size(length):
"""
minimum_part_size = 1024 * 1024 * 5
maximum_part_size = 1024 * 1024 * 1024 * 5
proposed_part_size = length / 9999
proposed_part_size = length / 9999 ## make sure last part has enough buffer
if proposed_part_size > maximum_part_size:
return maximum_part_size
return max(minimum_part_size, proposed_part_size)
27 changes: 16 additions & 11 deletions minio/minio.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ def _do_put_object(self, bucket, key, length, data,
method = 'PUT'

if len(data) != length:
raise DataSizeMismatchError()
raise UnexpectedShortReadError()

if upload_id.strip() and part_number is not 0:
url = get_target_url(self._endpoint_url, bucket=bucket, key=key,
Expand Down Expand Up @@ -622,6 +622,7 @@ def _do_put_object(self, bucket, key, length, data,
return response.headers['etag'].replace('"', '')

def _stream_put_object(self, bucket, key, length, data, content_type):
## TODO handle non blocking streams
if type(data).__name__ != 'file':
if not isinstance(data, io.BufferedReader):
if not isinstance(data, RawIOBase):
Expand All @@ -632,7 +633,6 @@ def _stream_put_object(self, bucket, key, length, data, content_type):
data = io.BufferedReader(data)

part_size = calculate_part_size(length)

current_uploads = ListIncompleteUploads(self._http,
self._endpoint_url,
bucket,
Expand All @@ -646,22 +646,28 @@ def _stream_put_object(self, bucket, key, length, data, content_type):
upload_id = upload.upload_id

uploaded_parts = {}
if upload_id is not None:
if upload_id is None:
upload_id = self._new_multipart_upload(bucket, key, content_type)
else:
part_iter = ListUploadParts(self._http, self._endpoint_url,
bucket, key, upload_id,
access_key=self._access_key,
secret_key=self._secret_key)
for part in part_iter:
uploaded_parts[part.part_number] = part
else:
upload_id = self._new_multipart_upload(bucket, key, content_type)

total_uploaded = 0
current_part_number = 1
etags = []
while total_uploaded < length:
current_data = data.read(part_size)
if len(current_data) == 0:
break
## Throw unexpected short read error
if len(current_data) < part_size:
if (length - total_uploaded) != len(current_data):
raise UnexpectedShortReadError()

current_data_md5 = encode_to_hex(get_md5(current_data))
previously_uploaded_part = None
if current_part_number in uploaded_parts:
Expand All @@ -679,16 +685,18 @@ def _stream_put_object(self, bucket, key, length, data, content_type):
etags.append(etag)
total_uploaded += len(current_data)
current_part_number += 1
if total_uploaded != length:
raise DataSizeMismatchError()

self._complete_multipart_upload(bucket, key, upload_id, etags)

def _drop_incomplete_upload(self, bucket, key, upload_id):
method = 'DELETE'
query = {
'uploadId': upload_id
}
url = get_target_url(self._endpoint_url, bucket=bucket, key=key, query=query)
url = get_target_url(self._endpoint_url,
bucket=bucket,
key=key,
query=query)
headers = {}

headers = sign_v4(method=method, url=url, headers=headers,
Expand Down Expand Up @@ -751,6 +759,3 @@ def _complete_multipart_upload(self, bucket, key, upload_id, etags):

if response.status != 200:
parse_error(response, bucket+"/"+key)

class DataSizeMismatchError(BaseException):
pass

0 comments on commit bb60999

Please sign in to comment.