Skip to content

Commit

Permalink
Merge pull request #1779 from tseaver/1760-streaming-multipart_upload…
Browse files Browse the repository at this point in the history
…_py3k

Ensure that mulitpart bodies are always bytes.
  • Loading branch information
tseaver committed May 12, 2016
2 parents cd8e645 + 4a8e952 commit 6064270
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 25 deletions.
13 changes: 10 additions & 3 deletions gcloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ def upload_from_file(self,
- ``text/csv``.
:type file_obj: file
:param file_obj: A file handle open for reading.
:param file_obj: A file handle opened in binary mode for reading.
:type source_format: str
:param source_format: one of 'CSV' or 'NEWLINE_DELIMITED_JSON'.
Expand Down Expand Up @@ -809,8 +809,9 @@ def upload_from_file(self,
:rtype: :class:`gcloud.bigquery.jobs.LoadTableFromStorageJob`
:returns: the job instance used to load the data (e.g., for
querying status)
:raises: :class:`ValueError` if size is not passed in and can not be
determined
:raises: :class:`ValueError` if ``size`` is not passed in and can not
be determined, or if the ``file_obj`` can be detected to be
a file opened in text mode.
"""
client = self._require_client(client)
connection = client.connection
Expand All @@ -820,6 +821,12 @@ def upload_from_file(self,
if rewind:
file_obj.seek(0, os.SEEK_SET)

mode = getattr(file_obj, 'mode', None)
if mode is not None and mode != 'rb':
raise ValueError(
"Cannot upload files opened in text mode: use "
"open(filename, mode='rb')")

# Get the basic stats about the file.
total_bytes = size
if total_bytes is None:
Expand Down
24 changes: 19 additions & 5 deletions gcloud/bigquery/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1289,6 +1289,19 @@ def _row_data(row):
self.assertEqual(req['path'], '/%s' % PATH)
self.assertEqual(req['data'], SENT)

def test_upload_from_file_text_mode_file_failure(self):

class TextModeFile(object):
mode = 'r'

conn = _Connection()
client = _Client(project=self.PROJECT, connection=conn)
dataset = _Dataset(client)
file_obj = TextModeFile()
table = self._makeOne(self.TABLE_NAME, dataset=dataset)
with self.assertRaises(ValueError):
table.upload_from_file(file_obj, 'CSV', size=1234)

def test_upload_from_file_size_failure(self):
conn = _Connection()
client = _Client(project=self.PROJECT, connection=conn)
Expand Down Expand Up @@ -1351,12 +1364,14 @@ def _upload_from_file_helper(self, **kw):
return conn.http._requested, PATH, BODY

def test_upload_from_file_w_bound_client_multipart(self):
from email.parser import Parser
import json
from six.moves.urllib.parse import parse_qsl
from six.moves.urllib.parse import urlsplit
from gcloud._helpers import _to_bytes
from gcloud.streaming.test_transfer import _email_chunk_parser

requested, PATH, BODY = self._upload_from_file_helper()
parse_chunk = _email_chunk_parser()

self.assertEqual(len(requested), 1)
req = requested[0]
Expand All @@ -1369,18 +1384,17 @@ def test_upload_from_file_w_bound_client_multipart(self):
self.assertEqual(dict(parse_qsl(qs)),
{'uploadType': 'multipart'})

parser = Parser()
ctype, boundary = [x.strip()
for x in req['headers']['content-type'].split(';')]
self.assertEqual(ctype, 'multipart/related')
self.assertTrue(boundary.startswith('boundary="=='))
self.assertTrue(boundary.endswith('=="'))

divider = '--' + boundary[len('boundary="'):-1]
divider = b'--' + _to_bytes(boundary[len('boundary="'):-1])
chunks = req['body'].split(divider)[1:-1] # discard prolog / epilog
self.assertEqual(len(chunks), 2)

text_msg = parser.parsestr(chunks[0].strip())
text_msg = parse_chunk(chunks[0].strip())
self.assertEqual(dict(text_msg._headers),
{'Content-Type': 'application/json',
'MIME-Version': '1.0'})
Expand All @@ -1394,7 +1408,7 @@ def test_upload_from_file_w_bound_client_multipart(self):
self.assertEqual(load_config['destinationTable'], DESTINATION_TABLE)
self.assertEqual(load_config['sourceFormat'], 'CSV')

app_msg = parser.parsestr(chunks[1].strip())
app_msg = parse_chunk(chunks[1].strip())
self.assertEqual(dict(app_msg._headers),
{'Content-Type': 'application/octet-stream',
'Content-Transfer-Encoding': 'binary',
Expand Down
24 changes: 18 additions & 6 deletions gcloud/streaming/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ def test_configure_request_w_simple_wo_body(self):
self.assertEqual(request.loggable_body, '<media body>')

def test_configure_request_w_simple_w_body(self):
from email.parser import Parser
from gcloud._helpers import _to_bytes
from gcloud.streaming.transfer import SIMPLE_UPLOAD
CONTENT = b'CONTENT'
BODY = b'BODY'
Expand All @@ -1045,31 +1045,31 @@ def test_configure_request_w_simple_w_body(self):
self.assertEqual(url_builder.query_params, {'uploadType': 'multipart'})
self.assertEqual(url_builder.relative_path, config.simple_path)

parser = Parser()
self.assertEqual(list(request.headers), ['content-type'])
ctype, boundary = [x.strip()
for x in request.headers['content-type'].split(';')]
self.assertEqual(ctype, 'multipart/related')
self.assertTrue(boundary.startswith('boundary="=='))
self.assertTrue(boundary.endswith('=="'))

divider = '--' + boundary[len('boundary="'):-1]
divider = b'--' + _to_bytes(boundary[len('boundary="'):-1])
chunks = request.body.split(divider)[1:-1] # discard prolog / epilog
self.assertEqual(len(chunks), 2)

text_msg = parser.parsestr(chunks[0].strip())
parse_chunk = _email_chunk_parser()
text_msg = parse_chunk(chunks[0].strip())
self.assertEqual(dict(text_msg._headers),
{'Content-Type': 'text/plain',
'MIME-Version': '1.0'})
self.assertEqual(text_msg._payload, BODY.decode('ascii'))

app_msg = parser.parsestr(chunks[1].strip())
app_msg = parse_chunk(chunks[1].strip())
self.assertEqual(dict(app_msg._headers),
{'Content-Type': self.MIME_TYPE,
'Content-Transfer-Encoding': 'binary',
'MIME-Version': '1.0'})
self.assertEqual(app_msg._payload, CONTENT.decode('ascii'))
self.assertTrue('<media body>' in request.loggable_body)
self.assertTrue(b'<media body>' in request.loggable_body)

def test_configure_request_w_resumable_wo_total_size(self):
from gcloud.streaming.transfer import RESUMABLE_UPLOAD
Expand Down Expand Up @@ -1784,6 +1784,18 @@ def test__send_chunk_w_total_size_stream_exhausted(self):
self.assertEqual(end, SIZE)


def _email_chunk_parser():
import six
if six.PY3: # pragma: NO COVER Python3
from email.parser import BytesParser
parser = BytesParser()
return parser.parsebytes
else:
from email.parser import Parser
parser = Parser()
return parser.parsestr


class _Dummy(object):
def __init__(self, **kw):
self.__dict__.update(kw)
Expand Down
24 changes: 13 additions & 11 deletions gcloud/streaming/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import six
from six.moves import http_client

from gcloud._helpers import _to_bytes
from gcloud.streaming.buffered_stream import BufferedStream
from gcloud.streaming.exceptions import CommunicationError
from gcloud.streaming.exceptions import HttpError
Expand Down Expand Up @@ -849,24 +850,25 @@ def _configure_multipart_request(self, http_request):
msg.set_payload(self.stream.read())
msg_root.attach(msg)

# NOTE: We encode the body, but can't use
# `email.message.Message.as_string` because it prepends
# `> ` to `From ` lines.
# NOTE: We must use six.StringIO() instead of io.StringIO() since the
# `email` library uses cStringIO in Py2 and io.StringIO in Py3.
stream = six.StringIO()
generator = email_generator.Generator(stream, mangle_from_=False)
# NOTE: generate multipart message as bytes, not text
stream = six.BytesIO()
if six.PY3: # pragma: NO COVER Python3
generator_class = email_generator.BytesGenerator
else:
generator_class = email_generator.Generator
generator = generator_class(stream, mangle_from_=False)
generator.flatten(msg_root, unixfrom=False)
http_request.body = stream.getvalue()

multipart_boundary = msg_root.get_boundary()
http_request.headers['content-type'] = (
'multipart/related; boundary="%s"' % multipart_boundary)

body_components = http_request.body.split(multipart_boundary)
headers, _, _ = body_components[-2].partition('\n\n')
body_components[-2] = '\n\n'.join([headers, '<media body>\n\n--'])
http_request.loggable_body = multipart_boundary.join(body_components)
boundary_bytes = _to_bytes(multipart_boundary)
body_components = http_request.body.split(boundary_bytes)
headers, _, _ = body_components[-2].partition(b'\n\n')
body_components[-2] = b'\n\n'.join([headers, b'<media body>\n\n--'])
http_request.loggable_body = boundary_bytes.join(body_components)

def _configure_resumable_request(self, http_request):
"""Helper for 'configure_request': set up resumable request."""
Expand Down

0 comments on commit 6064270

Please sign in to comment.